diff --git a/internal/core/src/config/ConfigKnowhere.cpp b/internal/core/src/config/ConfigKnowhere.cpp index 7578e3fbd5..105f1f9507 100644 --- a/internal/core/src/config/ConfigKnowhere.cpp +++ b/internal/core/src/config/ConfigKnowhere.cpp @@ -21,6 +21,7 @@ #include "easyloggingpp/easylogging++.h" #include "log/Log.h" #include "knowhere/archive/KnowhereConfig.h" +#include "knowhere/common/ThreadPool.h" namespace milvus::config { @@ -73,4 +74,9 @@ KnowhereSetSimdType(const char* value) { } } +void +KnowhereInitThreadPool(const uint32_t num_threads) { + knowhere::ThreadPool::InitGlobalThreadPool(num_threads); +} + } // namespace milvus::config diff --git a/internal/core/src/config/ConfigKnowhere.h b/internal/core/src/config/ConfigKnowhere.h index 29fbc05d03..1a45646ea6 100644 --- a/internal/core/src/config/ConfigKnowhere.h +++ b/internal/core/src/config/ConfigKnowhere.h @@ -25,4 +25,7 @@ KnowhereInitImpl(const char*); std::string KnowhereSetSimdType(const char*); +void +KnowhereInitThreadPool(const uint32_t); + } // namespace milvus::config diff --git a/internal/core/src/segcore/segcore_init_c.cpp b/internal/core/src/segcore/segcore_init_c.cpp index 7ffe6d58a9..8daa8305cc 100644 --- a/internal/core/src/segcore/segcore_init_c.cpp +++ b/internal/core/src/segcore/segcore_init_c.cpp @@ -39,6 +39,11 @@ SegcoreSetNprobe(const int64_t value) { config.set_nprobe(value); } +extern "C" void +SegcoreSetThreadPoolNum(const uint32_t num_threads) { + milvus::config::KnowhereInitThreadPool(num_threads); +} + // return value must be freed by the caller extern "C" char* SegcoreSetSimdType(const char* value) { diff --git a/internal/core/src/segcore/segcore_init_c.h b/internal/core/src/segcore/segcore_init_c.h index 25fdf85b64..b16daf8a0c 100644 --- a/internal/core/src/segcore/segcore_init_c.h +++ b/internal/core/src/segcore/segcore_init_c.h @@ -31,6 +31,9 @@ SegcoreSetNprobe(const int64_t); char* SegcoreSetSimdType(const char*); +void +SegcoreSetThreadPoolNum(const uint32_t num_threads); + #ifdef __cplusplus } #endif diff --git a/internal/core/thirdparty/knowhere/CMakeLists.txt b/internal/core/thirdparty/knowhere/CMakeLists.txt index 5878fc162d..fb2db0303a 100644 --- a/internal/core/thirdparty/knowhere/CMakeLists.txt +++ b/internal/core/thirdparty/knowhere/CMakeLists.txt @@ -11,8 +11,8 @@ # or implied. See the License for the specific language governing permissions and limitations under the License. #------------------------------------------------------------------------------- -set( KNOWHERE_VERSION v1.3.2 ) -set( KNOWHERE_SOURCE_MD5 "a0d6a22963cf9921f1360c518ba833bd") +set( KNOWHERE_VERSION v1.3.4 ) +set( KNOWHERE_SOURCE_MD5 "1a6dcbf87b74940c95dd0d3b3f04f541") if ( DEFINED ENV{MILVUS_KNOWHERE_URL} ) set( KNOWHERE_SOURCE_URL "$ENV{MILVUS_KNOWHERE_URL}" ) diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index 186431aea1..1ed2541cad 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -29,11 +29,9 @@ import ( "context" "errors" "io" - "math" "math/rand" "os" "path" - "runtime" "strconv" "sync" "sync/atomic" @@ -41,7 +39,6 @@ import ( "time" "unsafe" - "github.com/panjf2000/ants/v2" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" @@ -51,7 +48,6 @@ import ( "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/types" - "github.com/milvus-io/milvus/internal/util/concurrency" "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/internal/util/hardware" "github.com/milvus-io/milvus/internal/util/initcore" @@ -101,8 +97,6 @@ type IndexNode struct { initOnce sync.Once stateLock sync.Mutex tasks map[taskKey]*taskInfo - - cgoPool *concurrency.Pool } // NewIndexNode creates a new IndexNode component. @@ -209,27 +203,6 @@ func (i *IndexNode) Init() error { i.closer = trace.InitTracing("index_node") i.initKnowhere() - - // IndexNode will not execute tasks concurrently, so the size of goroutines pool is 1. - i.cgoPool, err = concurrency.NewPool(1, ants.WithPreAlloc(true), - ants.WithExpiryDuration(math.MaxInt64)) - if err != nil { - log.Error("IndexNode init cgo pool failed", zap.Error(err)) - initErr = err - return - } - - sig := make(chan struct{}) - wg := sync.WaitGroup{} - wg.Add(1) - i.cgoPool.Submit(func() (interface{}, error) { - runtime.LockOSThread() - wg.Done() - <-sig - return nil, nil - }) - wg.Wait() - close(sig) }) log.Debug("Init IndexNode finished", zap.Error(initErr)) @@ -419,7 +392,7 @@ func (i *IndexNode) GetNodeID() int64 { return Params.IndexNodeCfg.GetNodeID() } -//ShowConfigurations returns the configurations of indexNode matching req.Pattern +// ShowConfigurations returns the configurations of indexNode matching req.Pattern func (i *IndexNode) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) { if !i.isHealthy() { log.Warn("IndexNode.ShowConfigurations failed", diff --git a/internal/indexnode/indexnode_service.go b/internal/indexnode/indexnode_service.go index 2816793f88..5386f925c5 100644 --- a/internal/indexnode/indexnode_service.go +++ b/internal/indexnode/indexnode_service.go @@ -77,7 +77,6 @@ func (i *IndexNode) CreateJob(ctx context.Context, req *indexpb.CreateJobRequest nodeID: i.GetNodeID(), tr: timerecord.NewTimeRecorder(fmt.Sprintf("IndexBuildID: %d, ClusterID: %s", req.BuildID, req.ClusterID)), serializedSize: 0, - pool: i.cgoPool, } ret := &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, diff --git a/internal/indexnode/task.go b/internal/indexnode/task.go index b9bea0d808..172b71b637 100644 --- a/internal/indexnode/task.go +++ b/internal/indexnode/task.go @@ -34,7 +34,6 @@ import ( "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/storage" - "github.com/milvus-io/milvus/internal/util/concurrency" "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/indexcgowrapper" "github.com/milvus-io/milvus/internal/util/indexparamcheck" @@ -101,7 +100,6 @@ type indexBuildTask struct { tr *timerecord.TimeRecorder statistic indexpb.JobInfo node *IndexNode - pool *concurrency.Pool } func (it *indexBuildTask) Reset() { @@ -245,18 +243,11 @@ func (it *indexBuildTask) BuildIndex(ctx context.Context) error { dType := dataset.DType var err error if dType != schemapb.DataType_None { - _, err = it.pool.Submit(func() (interface{}, error) { - it.index, err = indexcgowrapper.NewCgoIndex(dType, it.newTypeParams, it.newIndexParams, it.req.GetStorageConfig()) - if err != nil { - return nil, err - } - + it.index, err = indexcgowrapper.NewCgoIndex(dType, it.newTypeParams, it.newIndexParams, it.req.GetStorageConfig()) + if err == nil { err = it.index.Build(dataset) - if err != nil { - return nil, err - } - return nil, nil - }).Await() + } + if err != nil { log.Ctx(ctx).Error("failed to build index", zap.Error(err)) return err @@ -360,19 +351,13 @@ func (it *indexBuildTask) BuildDiskAnnIndex(ctx context.Context) error { zap.Int64("buildID", it.BuildID), zap.String("index params", string(jsonIndexParams))) - _, err = it.pool.Submit(func() (interface{}, error) { - it.index, err = indexcgowrapper.NewCgoIndex(dType, it.newTypeParams, it.newIndexParams, it.req.GetStorageConfig()) - if err != nil { - log.Ctx(ctx).Error("failed to create index", zap.Error(err)) - return nil, err - } - + it.index, err = indexcgowrapper.NewCgoIndex(dType, it.newTypeParams, it.newIndexParams, it.req.GetStorageConfig()) + if err != nil { + log.Ctx(ctx).Error("failed to create index", zap.Error(err)) + } else { err = it.index.Build(dataset) - if err != nil { - return nil, err - } - return nil, nil - }).Await() + } + if err != nil { if it.index != nil && it.index.CleanLocalData() != nil { log.Ctx(ctx).Error("failed to clean cached data on disk after build index failed", diff --git a/internal/querynode/meta_replica.go b/internal/querynode/meta_replica.go index c230fe718b..0de3a47ae6 100644 --- a/internal/querynode/meta_replica.go +++ b/internal/querynode/meta_replica.go @@ -38,7 +38,6 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/querypb" - "github.com/milvus-io/milvus/internal/util/concurrency" "github.com/milvus-io/milvus/internal/util/typeutil" "github.com/samber/lo" ) @@ -155,8 +154,6 @@ type metaReplica struct { // segmentsBlackList stores segments which are still loading segmentsBlackList typeutil.UniqueSet - - cgoPool *concurrency.Pool } // getSegmentsMemSize get the memory size in bytes of all the Segments @@ -186,7 +183,7 @@ func (replica *metaReplica) printReplica() { log.Info("excludedSegments in collectionReplica", zap.Any("info", replica.excludedSegments)) } -//----------------------------------------------------------------------------------------------------- collection +// ----------------------------------------------------------------------------------------------------- collection // getCollectionIDs gets all the collection ids in the collectionReplica func (replica *metaReplica) getCollectionIDs() []UniqueID { replica.mu.RLock() @@ -395,7 +392,7 @@ func (replica *metaReplica) getSegmentInfosByColID(collectionID UniqueID) []*que return segmentInfos } -//----------------------------------------------------------------------------------------------------- partition +// ----------------------------------------------------------------------------------------------------- partition // addPartition adds a new partition to collection func (replica *metaReplica) addPartition(collectionID UniqueID, partitionID UniqueID) error { replica.mu.Lock() @@ -564,7 +561,7 @@ func (replica *metaReplica) getSegmentIDsPrivate(partitionID UniqueID, segType s return partition.getSegmentIDs(segType) } -//----------------------------------------------------------------------------------------------------- segment +// ----------------------------------------------------------------------------------------------------- segment // addSegment add a new segment to collectionReplica func (replica *metaReplica) addSegment(segmentID UniqueID, partitionID UniqueID, collectionID UniqueID, vChannelID Channel, version UniqueID, segType segmentType) error { replica.mu.Lock() @@ -577,7 +574,7 @@ func (replica *metaReplica) addSegment(segmentID UniqueID, partitionID UniqueID, collection.mu.Lock() defer collection.mu.Unlock() - seg, err := newSegment(collection, segmentID, partitionID, collectionID, vChannelID, segType, version, replica.cgoPool) + seg, err := newSegment(collection, segmentID, partitionID, collectionID, vChannelID, segType, version) if err != nil { return err } @@ -759,13 +756,13 @@ func (replica *metaReplica) getSegmentNum(segType segmentType) int { } } -// getSegmentStatistics returns the statistics of segments in collectionReplica +// getSegmentStatistics returns the statistics of segments in collectionReplica func (replica *metaReplica) getSegmentStatistics() []*internalpb.SegmentStats { // TODO: deprecated return nil } -// removeExcludedSegments will remove excludedSegments from collectionReplica +// removeExcludedSegments will remove excludedSegments from collectionReplica func (replica *metaReplica) removeExcludedSegments(collectionID UniqueID) { replica.mu.Lock() defer replica.mu.Unlock() @@ -869,7 +866,7 @@ func (replica *metaReplica) removeCollectionVDeltaChannel(collectionID UniqueID, } // newCollectionReplica returns a new ReplicaInterface -func newCollectionReplica(pool *concurrency.Pool) ReplicaInterface { +func newCollectionReplica() ReplicaInterface { var replica ReplicaInterface = &metaReplica{ collections: make(map[UniqueID]*Collection), partitions: make(map[UniqueID]*Partition), @@ -879,8 +876,6 @@ func newCollectionReplica(pool *concurrency.Pool) ReplicaInterface { excludedSegments: make(map[UniqueID][]*datapb.SegmentInfo), segmentsBlackList: make(typeutil.UniqueSet), - - cgoPool: pool, } return replica diff --git a/internal/querynode/meta_replica_test.go b/internal/querynode/meta_replica_test.go index 03ca210053..34453b4e11 100644 --- a/internal/querynode/meta_replica_test.go +++ b/internal/querynode/meta_replica_test.go @@ -17,7 +17,6 @@ package querynode import ( - "runtime" "testing" "github.com/stretchr/testify/assert" @@ -25,7 +24,6 @@ import ( "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus/internal/proto/querypb" - "github.com/milvus-io/milvus/internal/util/concurrency" ) func TestMetaReplica_collection(t *testing.T) { @@ -228,9 +226,6 @@ func TestMetaReplica_segment(t *testing.T) { assert.NoError(t, err) defer replica.freeAll() - pool, err := concurrency.NewPool(runtime.GOMAXPROCS(0)) - require.NoError(t, err) - schema := genTestCollectionSchema() collection := replica.addCollection(defaultCollectionID, schema) replica.addPartition(defaultCollectionID, defaultPartitionID) @@ -250,12 +245,12 @@ func TestMetaReplica_segment(t *testing.T) { }, } - segment1, err := newSegment(collection, UniqueID(1), defaultPartitionID, defaultCollectionID, "", segmentTypeGrowing, defaultSegmentVersion, pool) + segment1, err := newSegment(collection, UniqueID(1), defaultPartitionID, defaultCollectionID, "", segmentTypeGrowing, defaultSegmentVersion) assert.NoError(t, err) err = replica.setSegment(segment1) assert.NoError(t, err) - segment2, err := newSegment(collection, UniqueID(2), defaultPartitionID, defaultCollectionID, "", segmentTypeSealed, defaultSegmentVersion, pool) + segment2, err := newSegment(collection, UniqueID(2), defaultPartitionID, defaultCollectionID, "", segmentTypeSealed, defaultSegmentVersion) assert.NoError(t, err) segment2.setIndexedFieldInfo(fieldID, indexInfo) err = replica.setSegment(segment2) @@ -277,30 +272,27 @@ func TestMetaReplica_segment(t *testing.T) { assert.NoError(t, err) defer replica.freeAll() - pool, err := concurrency.NewPool(runtime.GOMAXPROCS(0)) - require.NoError(t, err) - schema := genTestCollectionSchema() collection := replica.addCollection(defaultCollectionID, schema) replica.addPartition(defaultCollectionID, defaultPartitionID) replica.addPartition(defaultCollectionID, defaultPartitionID+1) - segment1, err := newSegment(collection, UniqueID(1), defaultPartitionID, defaultCollectionID, "channel1", segmentTypeGrowing, defaultSegmentVersion, pool) + segment1, err := newSegment(collection, UniqueID(1), defaultPartitionID, defaultCollectionID, "channel1", segmentTypeGrowing, defaultSegmentVersion) assert.NoError(t, err) err = replica.setSegment(segment1) assert.NoError(t, err) - segment2, err := newSegment(collection, UniqueID(2), defaultPartitionID+1, defaultCollectionID, "channel2", segmentTypeGrowing, defaultSegmentVersion, pool) + segment2, err := newSegment(collection, UniqueID(2), defaultPartitionID+1, defaultCollectionID, "channel2", segmentTypeGrowing, defaultSegmentVersion) assert.NoError(t, err) err = replica.setSegment(segment2) assert.NoError(t, err) - segment3, err := newSegment(collection, UniqueID(3), defaultPartitionID+1, defaultCollectionID, "channel2", segmentTypeGrowing, defaultSegmentVersion, pool) + segment3, err := newSegment(collection, UniqueID(3), defaultPartitionID+1, defaultCollectionID, "channel2", segmentTypeGrowing, defaultSegmentVersion) assert.NoError(t, err) err = replica.setSegment(segment3) assert.NoError(t, err) - segment4, err := newSegment(collection, UniqueID(4), defaultPartitionID, defaultCollectionID, "channel1", segmentTypeSealed, defaultSegmentVersion, pool) + segment4, err := newSegment(collection, UniqueID(4), defaultPartitionID, defaultCollectionID, "channel1", segmentTypeSealed, defaultSegmentVersion) assert.NoError(t, err) err = replica.setSegment(segment4) assert.NoError(t, err) @@ -352,16 +344,13 @@ func TestMetaReplica_BlackList(t *testing.T) { replica.addPartition(defaultCollectionID, defaultPartitionID) replica.addPartition(defaultCollectionID, defaultPartitionID+1) - pool, err := concurrency.NewPool(runtime.GOMAXPROCS(0)) - require.NoError(t, err) - - segment1, err := newSegment(collection, UniqueID(1), defaultPartitionID, defaultCollectionID, "channel1", segmentTypeSealed, defaultSegmentVersion, pool) + segment1, err := newSegment(collection, UniqueID(1), defaultPartitionID, defaultCollectionID, "channel1", segmentTypeSealed, defaultSegmentVersion) assert.NoError(t, err) - segment2, err := newSegment(collection, UniqueID(2), defaultPartitionID, defaultCollectionID, "channel2", segmentTypeSealed, defaultSegmentVersion, pool) + segment2, err := newSegment(collection, UniqueID(2), defaultPartitionID, defaultCollectionID, "channel2", segmentTypeSealed, defaultSegmentVersion) assert.NoError(t, err) - segment3, err := newSegment(collection, UniqueID(3), defaultPartitionID, defaultCollectionID, "channel2", segmentTypeGrowing, defaultSegmentVersion, pool) + segment3, err := newSegment(collection, UniqueID(3), defaultPartitionID, defaultCollectionID, "channel2", segmentTypeGrowing, defaultSegmentVersion) assert.NoError(t, err) replica.addSegmentsLoadingList([]UniqueID{1, 2, 3}) diff --git a/internal/querynode/mock_test.go b/internal/querynode/mock_test.go index 6e5626f23f..b0b2854f43 100644 --- a/internal/querynode/mock_test.go +++ b/internal/querynode/mock_test.go @@ -25,7 +25,6 @@ import ( "math/rand" "path" "path/filepath" - "runtime" "strconv" "github.com/golang/protobuf/proto" @@ -1235,10 +1234,6 @@ func genSealedSegment(schema *schemapb.CollectionSchema, vChannel Channel, msgLength int) (*Segment, error) { col := newCollection(collectionID, schema) - pool, err := concurrency.NewPool(runtime.GOMAXPROCS(0)) - if err != nil { - return nil, err - } seg, err := newSegment(col, segmentID, @@ -1246,8 +1241,7 @@ func genSealedSegment(schema *schemapb.CollectionSchema, collectionID, vChannel, segmentTypeSealed, - defaultSegmentVersion, - pool) + defaultSegmentVersion) if err != nil { return nil, err } @@ -1284,28 +1278,20 @@ func genSimpleSealedSegment(msgLength int) (*Segment, error) { } func genSimpleReplica() (ReplicaInterface, error) { - pool, err := concurrency.NewPool(runtime.GOMAXPROCS(0)) - if err != nil { - return nil, err - } - r := newCollectionReplica(pool) + r := newCollectionReplica() schema := genTestCollectionSchema() r.addCollection(defaultCollectionID, schema) - err = r.addPartition(defaultCollectionID, defaultPartitionID) + err := r.addPartition(defaultCollectionID, defaultPartitionID) return r, err } func genSimpleSegmentLoaderWithMqFactory(metaReplica ReplicaInterface, factory msgstream.Factory) (*segmentLoader, error) { - pool, err := concurrency.NewPool(runtime.GOMAXPROCS(1)) - if err != nil { - return nil, err - } kv, err := genEtcdKV() if err != nil { return nil, err } cm := storage.NewLocalChunkManager(storage.RootPath(defaultLocalStorage)) - return newSegmentLoader(metaReplica, kv, cm, factory, pool), nil + return newSegmentLoader(metaReplica, kv, cm, factory), nil } func genSimpleReplicaWithSealSegment(ctx context.Context) (ReplicaInterface, error) { diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index e5f20fbc81..26d4e00412 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -30,7 +30,6 @@ import "C" import ( "context" "fmt" - "math" "os" "path" "runtime" @@ -78,9 +77,10 @@ var rateCol *rateCollector // services in querynode package. // // QueryNode implements `types.Component`, `types.QueryNode` interfaces. -// `rootCoord` is a grpc client of root coordinator. -// `indexCoord` is a grpc client of index coordinator. -// `stateCode` is current statement of this query node, indicating whether it's healthy. +// +// `rootCoord` is a grpc client of root coordinator. +// `indexCoord` is a grpc client of index coordinator. +// `stateCode` is current statement of this query node, indicating whether it's healthy. type QueryNode struct { queryNodeLoopCtx context.Context queryNodeLoopCancel context.CancelFunc @@ -121,8 +121,6 @@ type QueryNode struct { //shard query service, handles shard-level query & search queryShardService *queryShardService - // cgoPool is the worker pool to control concurrency of cgo call - cgoPool *concurrency.Pool // pool for load/release channel taskPool *concurrency.Pool } @@ -197,6 +195,9 @@ func (node *QueryNode) InitSegcore() { C.SegcoreInit(cEasyloggingYaml) C.free(unsafe.Pointer(cEasyloggingYaml)) + cpuNum := runtime.GOMAXPROCS(0) + C.SegcoreSetThreadPoolNum(C.uint32_t(cpuNum)) + // override segcore chunk size cChunkRows := C.int64_t(Params.QueryNodeCfg.ChunkRows) C.SegcoreSetChunkRows(cChunkRows) @@ -261,13 +262,6 @@ func (node *QueryNode) Init() error { log.Info("queryNode try to connect etcd success", zap.Any("MetaRootPath", Params.EtcdCfg.MetaRootPath)) cpuNum := runtime.GOMAXPROCS(0) - node.cgoPool, err = concurrency.NewPool(cpuNum, ants.WithPreAlloc(true), - ants.WithExpiryDuration(math.MaxInt64)) - if err != nil { - log.Error("QueryNode init cgo pool failed", zap.Error(err)) - initError = err - return - } node.taskPool, err = concurrency.NewPool(cpuNum, ants.WithPreAlloc(true)) if err != nil { @@ -276,30 +270,13 @@ func (node *QueryNode) Init() error { return } - // ensure every cgopool go routine is locked with a OS thread - // so openmp in knowhere won't create too much request - sig := make(chan struct{}) - wg := sync.WaitGroup{} - wg.Add(cpuNum) - for i := 0; i < cpuNum; i++ { - node.cgoPool.Submit(func() (interface{}, error) { - runtime.LockOSThread() - wg.Done() - <-sig - return nil, nil - }) - } - wg.Wait() - close(sig) - - node.metaReplica = newCollectionReplica(node.cgoPool) + node.metaReplica = newCollectionReplica() node.loader = newSegmentLoader( node.metaReplica, node.etcdKV, node.vectorStorage, - node.factory, - node.cgoPool) + node.factory) node.dataSyncService = newDataSyncService(node.queryNodeLoopCtx, node.metaReplica, node.tSafeReplica, node.factory) diff --git a/internal/querynode/query_node_test.go b/internal/querynode/query_node_test.go index 5f0645dc2d..55a9c86af0 100644 --- a/internal/querynode/query_node_test.go +++ b/internal/querynode/query_node_test.go @@ -21,7 +21,6 @@ import ( "io/ioutil" "net/url" "os" - "runtime" "sync" "testing" "time" @@ -29,7 +28,6 @@ import ( "github.com/stretchr/testify/assert" "go.etcd.io/etcd/server/v3/embed" - "github.com/milvus-io/milvus/internal/util/concurrency" "github.com/milvus-io/milvus/internal/util/dependency" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" @@ -99,19 +97,14 @@ func newQueryNodeMock() *QueryNode { svr := NewQueryNode(ctx, factory) tsReplica := newTSafeReplica() - pool, err := concurrency.NewPool(runtime.GOMAXPROCS(0)) - if err != nil { - panic(err) - } - - replica := newCollectionReplica(pool) + replica := newCollectionReplica() svr.metaReplica = replica svr.dataSyncService = newDataSyncService(ctx, svr.metaReplica, tsReplica, factory) svr.vectorStorage, err = factory.NewPersistentStorageChunkManager(ctx) if err != nil { panic(err) } - svr.loader = newSegmentLoader(svr.metaReplica, etcdKV, svr.vectorStorage, factory, pool) + svr.loader = newSegmentLoader(svr.metaReplica, etcdKV, svr.vectorStorage, factory) svr.etcdKV = etcdKV return svr diff --git a/internal/querynode/segment.go b/internal/querynode/segment.go index c4174ec127..92e435bbce 100644 --- a/internal/querynode/segment.go +++ b/internal/querynode/segment.go @@ -34,7 +34,6 @@ import ( "sync" "unsafe" - "github.com/milvus-io/milvus/internal/util/concurrency" "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/typeutil" @@ -99,8 +98,6 @@ type Segment struct { // only used by sealed segments currentStat *storage.PkStatistics historyStats []*storage.PkStatistics - - pool *concurrency.Pool } // ID returns the identity number. @@ -171,8 +168,7 @@ func newSegment(collection *Collection, collectionID UniqueID, vChannelID Channel, segType segmentType, - version UniqueID, - pool *concurrency.Pool) (*Segment, error) { + version UniqueID) (*Segment, error) { /* CSegmentInterface NewSegment(CCollection collection, uint64_t segment_id, SegmentType seg_type); @@ -180,15 +176,9 @@ func newSegment(collection *Collection, var segmentPtr C.CSegmentInterface switch segType { case segmentTypeSealed: - pool.Submit(func() (interface{}, error) { - segmentPtr = C.NewSegment(collection.collectionPtr, C.Sealed, C.int64_t(segmentID)) - return nil, nil - }).Await() + segmentPtr = C.NewSegment(collection.collectionPtr, C.Sealed, C.int64_t(segmentID)) case segmentTypeGrowing: - pool.Submit(func() (interface{}, error) { - segmentPtr = C.NewSegment(collection.collectionPtr, C.Growing, C.int64_t(segmentID)) - return nil, nil - }).Await() + segmentPtr = C.NewSegment(collection.collectionPtr, C.Growing, C.int64_t(segmentID)) default: err := fmt.Errorf("illegal segment type %d when create segment %d", segType, segmentID) log.Warn("create new segment error", @@ -218,7 +208,6 @@ func newSegment(collection *Collection, recentlyModified: atomic.NewBool(false), destroyed: atomic.NewBool(false), historyStats: []*storage.PkStatistics{}, - pool: pool, } return segment, nil @@ -241,10 +230,7 @@ func deleteSegment(segment *Segment) { return } - segment.pool.Submit(func() (interface{}, error) { - C.DeleteSegment(cPtr) - return nil, nil - }).Await() + C.DeleteSegment(cPtr) segment.currentStat = nil segment.historyStats = nil @@ -266,11 +252,8 @@ func (s *Segment) getRealCount() int64 { if !s.healthy() { return -1 } - var rowCount C.int64_t - s.pool.Submit(func() (interface{}, error) { - rowCount = C.GetRealCount(s.segmentPtr) - return nil, nil - }).Await() + + rowCount := C.GetRealCount(s.segmentPtr) return int64(rowCount) } @@ -285,11 +268,8 @@ func (s *Segment) getRowCount() int64 { if !s.healthy() { return -1 } - var rowCount C.int64_t - s.pool.Submit(func() (interface{}, error) { - rowCount = C.GetRowCount(s.segmentPtr) - return nil, nil - }).Await() + + rowCount := C.GetRowCount(s.segmentPtr) return int64(rowCount) } @@ -305,11 +285,7 @@ func (s *Segment) getDeletedCount() int64 { return -1 } - var deletedCount C.int64_t - s.pool.Submit(func() (interface{}, error) { - deletedCount = C.GetRowCount(s.segmentPtr) - return nil, nil - }).Await() + deletedCount := C.GetRowCount(s.segmentPtr) return int64(deletedCount) } @@ -324,11 +300,7 @@ func (s *Segment) getMemSize() int64 { if !s.healthy() { return -1 } - var memoryUsageInBytes C.int64_t - s.pool.Submit(func() (interface{}, error) { - memoryUsageInBytes = C.GetMemoryUsageInBytes(s.segmentPtr) - return nil, nil - }).Await() + memoryUsageInBytes := C.GetMemoryUsageInBytes(s.segmentPtr) return int64(memoryUsageInBytes) } @@ -361,14 +333,11 @@ func (s *Segment) search(searchReq *searchRequest) (*SearchResult, error) { zap.String("segmentType", s.segmentType.String()), zap.Bool("loadIndex", loadIndex)) - var status C.CStatus - s.pool.Submit(func() (interface{}, error) { - tr := timerecord.NewTimeRecorder("cgoSearch") - status = C.Search(s.segmentPtr, searchReq.plan.cSearchPlan, searchReq.cPlaceholderGroup, - C.uint64_t(searchReq.timestamp), &searchResult.cSearchResult) - metrics.QueryNodeSQSegmentLatencyInCore.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.SearchLabel).Observe(float64(tr.ElapseSpan().Milliseconds())) - return nil, nil - }).Await() + tr := timerecord.NewTimeRecorder("cgoSearch") + status := C.Search(s.segmentPtr, searchReq.plan.cSearchPlan, searchReq.cPlaceholderGroup, + C.uint64_t(searchReq.timestamp), &searchResult.cSearchResult) + metrics.QueryNodeSQSegmentLatencyInCore.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.SearchLabel).Observe(float64(tr.ElapseSpan().Milliseconds())) + if err := HandleCStatus(&status, "Search failed"); err != nil { return nil, err } @@ -390,18 +359,13 @@ func (s *Segment) retrieve(plan *RetrievePlan) (*segcorepb.RetrieveResults, erro var retrieveResult RetrieveResult ts := C.uint64_t(plan.Timestamp) - var status C.CStatus - s.pool.Submit(func() (interface{}, error) { - tr := timerecord.NewTimeRecorder("cgoRetrieve") - status = C.Retrieve(s.segmentPtr, plan.cRetrievePlan, ts, &retrieveResult.cRetrieveResult) - metrics.QueryNodeSQSegmentLatencyInCore.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), - metrics.QueryLabel).Observe(float64(tr.ElapseSpan().Milliseconds())) - log.Debug("do retrieve on segment", - zap.Int64("msgID", plan.msgID), - zap.Int64("segmentID", s.segmentID), zap.String("segmentType", s.segmentType.String())) - - return nil, nil - }).Await() + tr := timerecord.NewTimeRecorder("cgoRetrieve") + status := C.Retrieve(s.segmentPtr, plan.cRetrievePlan, ts, &retrieveResult.cRetrieveResult) + metrics.QueryNodeSQSegmentLatencyInCore.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), + metrics.QueryLabel).Observe(float64(tr.ElapseSpan().Milliseconds())) + log.Debug("do retrieve on segment", + zap.Int64("msgID", plan.msgID), + zap.Int64("segmentID", s.segmentID), zap.String("segmentType", s.segmentType.String())) if err := HandleCStatus(&status, "Retrieve failed"); err != nil { return nil, err @@ -665,7 +629,7 @@ func (s *Segment) isPKExist(pk primaryKey) bool { return false } -//-------------------------------------------------------------------------------------- interfaces for growing segment +// -------------------------------------------------------------------------------------- interfaces for growing segment func (s *Segment) segmentPreInsert(numOfRecords int) (int64, error) { /* long int @@ -682,12 +646,9 @@ func (s *Segment) segmentPreInsert(numOfRecords int) (int64, error) { } var offset int64 - var status C.CStatus cOffset := (*C.int64_t)(&offset) - s.pool.Submit(func() (interface{}, error) { - status = C.PreInsert(s.segmentPtr, C.int64_t(int64(numOfRecords)), cOffset) - return nil, nil - }).Await() + status := C.PreInsert(s.segmentPtr, C.int64_t(int64(numOfRecords)), cOffset) + if err := HandleCStatus(&status, "PreInsert failed"); err != nil { return 0, err } @@ -705,12 +666,7 @@ func (s *Segment) segmentPreDelete(numOfRecords int) int64 { return -1 } - var offset C.int64_t - s.pool.Submit(func() (interface{}, error) { - offset = C.PreDelete(s.segmentPtr, C.int64_t(int64(numOfRecords))) - - return nil, nil - }).Await() + offset := C.PreDelete(s.segmentPtr, C.int64_t(int64(numOfRecords))) return int64(offset) } @@ -737,19 +693,13 @@ func (s *Segment) segmentInsert(offset int64, entityIDs []UniqueID, timestamps [ var cEntityIdsPtr = (*C.int64_t)(&(entityIDs)[0]) var cTimestampsPtr = (*C.uint64_t)(&(timestamps)[0]) - var status C.CStatus - - s.pool.Submit(func() (interface{}, error) { - status = C.Insert(s.segmentPtr, - cOffset, - cNumOfRows, - cEntityIdsPtr, - cTimestampsPtr, - (*C.uint8_t)(unsafe.Pointer(&insertRecordBlob[0])), - (C.uint64_t)(len(insertRecordBlob))) - - return nil, nil - }).Await() + status := C.Insert(s.segmentPtr, + cOffset, + cNumOfRows, + cEntityIdsPtr, + cTimestampsPtr, + (*C.uint8_t)(unsafe.Pointer(&insertRecordBlob[0])), + (C.uint64_t)(len(insertRecordBlob))) if err := HandleCStatus(&status, "Insert failed"); err != nil { return err @@ -818,12 +768,7 @@ func (s *Segment) segmentDelete(offset int64, entityIDs []primaryKey, timestamps return fmt.Errorf("failed to marshal ids: %s", err) } - var status C.CStatus - s.pool.Submit(func() (interface{}, error) { - status = C.Delete(s.segmentPtr, cOffset, cSize, (*C.uint8_t)(unsafe.Pointer(&dataBlob[0])), (C.uint64_t)(len(dataBlob)), cTimestampsPtr) - - return nil, nil - }).Await() + status := C.Delete(s.segmentPtr, cOffset, cSize, (*C.uint8_t)(unsafe.Pointer(&dataBlob[0])), (C.uint64_t)(len(dataBlob)), cTimestampsPtr) if err := HandleCStatus(&status, "Delete failed"); err != nil { return err @@ -832,7 +777,7 @@ func (s *Segment) segmentDelete(offset int64, entityIDs []primaryKey, timestamps return nil } -//-------------------------------------------------------------------------------------- interfaces for sealed segment +// -------------------------------------------------------------------------------------- interfaces for sealed segment func (s *Segment) segmentLoadFieldData(fieldID int64, rowCount int64, data *schemapb.FieldData) error { /* CStatus @@ -860,11 +805,7 @@ func (s *Segment) segmentLoadFieldData(fieldID int64, rowCount int64, data *sche row_count: C.int64_t(rowCount), } - var status C.CStatus - s.pool.Submit(func() (interface{}, error) { - status = C.LoadFieldData(s.segmentPtr, loadInfo) - return nil, nil - }).Await() + status := C.LoadFieldData(s.segmentPtr, loadInfo) if err := HandleCStatus(&status, "LoadFieldData failed"); err != nil { return err @@ -930,11 +871,7 @@ func (s *Segment) segmentLoadDeletedRecord(primaryKeys []primaryKey, timestamps CStatus LoadDeletedRecord(CSegmentInterface c_segment, CLoadDeletedRecordInfo deleted_record_info) */ - var status C.CStatus - s.pool.Submit(func() (interface{}, error) { - status = C.LoadDeletedRecord(s.segmentPtr, loadInfo) - return nil, nil - }).Await() + status := C.LoadDeletedRecord(s.segmentPtr, loadInfo) if err := HandleCStatus(&status, "LoadDeletedRecord failed"); err != nil { return err @@ -973,11 +910,7 @@ func (s *Segment) segmentLoadIndexData(bytesIndex [][]byte, indexInfo *querypb.F return fmt.Errorf("%w(segmentID=%d)", ErrSegmentUnhealthy, s.segmentID) } - var status C.CStatus - s.pool.Submit(func() (interface{}, error) { - status = C.UpdateSealedSegmentIndex(s.segmentPtr, loadIndexInfo.cLoadIndexInfo) - return nil, nil - }).Await() + status := C.UpdateSealedSegmentIndex(s.segmentPtr, loadIndexInfo.cLoadIndexInfo) if err := HandleCStatus(&status, "UpdateSealedSegmentIndex failed"); err != nil { return err diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go index a8ff0d4d71..7d68d42088 100644 --- a/internal/querynode/segment_loader.go +++ b/internal/querynode/segment_loader.go @@ -73,8 +73,6 @@ type segmentLoader struct { ioPool *concurrency.Pool cpuPool *concurrency.Pool - // cgoPool for all cgo invocation - cgoPool *concurrency.Pool factory msgstream.Factory } @@ -153,7 +151,7 @@ func (loader *segmentLoader) LoadSegment(ctx context.Context, req *querypb.LoadS return nil, err } - segment, err := newSegment(collection, segmentID, partitionID, collectionID, vChannelID, segmentType, req.GetVersion(), loader.cgoPool) + segment, err := newSegment(collection, segmentID, partitionID, collectionID, vChannelID, segmentType, req.GetVersion()) if err != nil { log.Error("load segment failed when create new segment", zap.Int64("partitionID", partitionID), @@ -980,8 +978,7 @@ func newSegmentLoader( metaReplica ReplicaInterface, etcdKV *etcdkv.EtcdKV, cm storage.ChunkManager, - factory msgstream.Factory, - pool *concurrency.Pool) *segmentLoader { + factory msgstream.Factory) *segmentLoader { cpuNum := runtime.GOMAXPROCS(0) ioPoolSize := cpuNum * 8 @@ -1019,7 +1016,6 @@ func newSegmentLoader( // init them later ioPool: ioPool, cpuPool: cpuPool, - cgoPool: pool, factory: factory, } diff --git a/internal/querynode/segment_loader_test.go b/internal/querynode/segment_loader_test.go index b638c3a51e..b0ade78fcd 100644 --- a/internal/querynode/segment_loader_test.go +++ b/internal/querynode/segment_loader_test.go @@ -26,7 +26,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" "go.uber.org/atomic" "github.com/milvus-io/milvus-proto/go-api/commonpb" @@ -39,7 +38,6 @@ import ( "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/storage" - "github.com/milvus-io/milvus/internal/util/concurrency" "github.com/milvus-io/milvus/internal/util/funcutil" ) @@ -183,9 +181,6 @@ func TestSegmentLoader_loadSegmentFieldsData(t *testing.T) { loader := node.loader assert.NotNil(t, loader) - pool, err := concurrency.NewPool(runtime.GOMAXPROCS(0)) - require.NoError(t, err) - var fieldPk *schemapb.FieldSchema switch pkType { case schemapb.DataType_Int64: @@ -234,8 +229,7 @@ func TestSegmentLoader_loadSegmentFieldsData(t *testing.T) { defaultCollectionID, defaultDMLChannel, segmentTypeSealed, - defaultSegmentVersion, - pool) + defaultSegmentVersion) assert.Nil(t, err) binlog, _, err := saveBinLog(ctx, defaultCollectionID, defaultPartitionID, defaultSegmentID, defaultMsgLength, schema) @@ -367,9 +361,6 @@ func TestSegmentLoader_invalid(t *testing.T) { loader := node.loader assert.NotNil(t, loader) - pool, err := concurrency.NewPool(runtime.GOMAXPROCS(0)) - require.NoError(t, err) - cm := &mocks.ChunkManager{} cm.EXPECT().Read(mock.Anything, mock.AnythingOfType("string")).Return(nil, errors.New("mocked")) @@ -392,8 +383,7 @@ func TestSegmentLoader_invalid(t *testing.T) { defaultCollectionID, defaultDMLChannel, segmentTypeSealed, - defaultSegmentVersion, - pool) + defaultSegmentVersion) assert.Nil(t, err) binlog, _, err := saveBinLog(ctx, defaultCollectionID, defaultPartitionID, defaultSegmentID, defaultMsgLength, schema) @@ -410,9 +400,6 @@ func TestSegmentLoader_invalid(t *testing.T) { loader := node.loader assert.NotNil(t, loader) - pool, err := concurrency.NewPool(runtime.GOMAXPROCS(0)) - require.NoError(t, err) - cm := &mocks.ChunkManager{} cm.EXPECT().Read(mock.Anything, mock.AnythingOfType("string")).Return(nil, errors.New("mocked")) @@ -435,8 +422,7 @@ func TestSegmentLoader_invalid(t *testing.T) { defaultCollectionID, defaultDMLChannel, segmentTypeSealed, - defaultSegmentVersion, - pool) + defaultSegmentVersion) assert.Nil(t, err) err = loader.loadFieldIndexData(ctx, segment, &querypb.FieldIndexInfo{ @@ -476,7 +462,7 @@ func TestSegmentLoader_testLoadGrowing(t *testing.T) { collection, err := node.metaReplica.getCollectionByID(defaultCollectionID) assert.NoError(t, err) - segment, err := newSegment(collection, defaultSegmentID+1, defaultPartitionID, defaultCollectionID, defaultDMLChannel, segmentTypeGrowing, defaultSegmentVersion, loader.cgoPool) + segment, err := newSegment(collection, defaultSegmentID+1, defaultPartitionID, defaultCollectionID, defaultDMLChannel, segmentTypeGrowing, defaultSegmentVersion) assert.Nil(t, err) insertData, err := genInsertData(defaultMsgLength, collection.schema) @@ -505,7 +491,7 @@ func TestSegmentLoader_testLoadGrowing(t *testing.T) { collection, err := node.metaReplica.getCollectionByID(defaultCollectionID) assert.NoError(t, err) - segment, err := newSegment(collection, defaultSegmentID+1, defaultPartitionID, defaultCollectionID, defaultDMLChannel, segmentTypeGrowing, defaultSegmentVersion, node.loader.cgoPool) + segment, err := newSegment(collection, defaultSegmentID+1, defaultPartitionID, defaultCollectionID, defaultDMLChannel, segmentTypeGrowing, defaultSegmentVersion) assert.Nil(t, err) insertData, err := genInsertData(defaultMsgLength, collection.schema) diff --git a/internal/querynode/segment_test.go b/internal/querynode/segment_test.go index 6e65dcf6e4..c977210fe7 100644 --- a/internal/querynode/segment_test.go +++ b/internal/querynode/segment_test.go @@ -20,7 +20,6 @@ import ( "context" "fmt" "math" - "runtime" "testing" "github.com/golang/protobuf/proto" @@ -35,15 +34,11 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/segcorepb" "github.com/milvus-io/milvus/internal/storage" - "github.com/milvus-io/milvus/internal/util/concurrency" "github.com/milvus-io/milvus/internal/util/funcutil" ) -//-------------------------------------------------------------------------------------- constructor and destructor +// -------------------------------------------------------------------------------------- constructor and destructor func TestSegment_newSegment(t *testing.T) { - pool, err := concurrency.NewPool(runtime.GOMAXPROCS(0)) - require.NoError(t, err) - collectionID := UniqueID(0) schema := genTestCollectionSchema() collectionMeta := genCollectionMeta(collectionID, schema) @@ -52,7 +47,7 @@ func TestSegment_newSegment(t *testing.T) { assert.Equal(t, collection.ID(), collectionID) segmentID := UniqueID(0) - segment, err := newSegment(collection, segmentID, defaultPartitionID, collectionID, "", segmentTypeGrowing, defaultSegmentVersion, pool) + segment, err := newSegment(collection, segmentID, defaultPartitionID, collectionID, "", segmentTypeGrowing, defaultSegmentVersion) assert.Nil(t, err) assert.Equal(t, segmentID, segment.segmentID) deleteSegment(segment) @@ -62,15 +57,12 @@ func TestSegment_newSegment(t *testing.T) { _, err = newSegment(collection, defaultSegmentID, defaultPartitionID, - collectionID, "", 100, defaultSegmentVersion, pool) + collectionID, "", 100, defaultSegmentVersion) assert.Error(t, err) }) } func TestSegment_deleteSegment(t *testing.T) { - pool, err := concurrency.NewPool(runtime.GOMAXPROCS(0)) - require.NoError(t, err) - collectionID := UniqueID(0) schema := genTestCollectionSchema() collectionMeta := genCollectionMeta(collectionID, schema) @@ -79,7 +71,7 @@ func TestSegment_deleteSegment(t *testing.T) { assert.Equal(t, collection.ID(), collectionID) segmentID := UniqueID(0) - segment, err := newSegment(collection, segmentID, defaultPartitionID, collectionID, "", segmentTypeGrowing, defaultSegmentVersion, pool) + segment, err := newSegment(collection, segmentID, defaultPartitionID, collectionID, "", segmentTypeGrowing, defaultSegmentVersion) assert.Equal(t, segmentID, segment.segmentID) assert.Nil(t, err) @@ -94,11 +86,8 @@ func TestSegment_deleteSegment(t *testing.T) { }) } -//-------------------------------------------------------------------------------------- stats functions +// -------------------------------------------------------------------------------------- stats functions func TestSegment_getRowCount(t *testing.T) { - pool, err := concurrency.NewPool(runtime.GOMAXPROCS(0)) - require.NoError(t, err) - collectionID := UniqueID(0) schema := genTestCollectionSchema() @@ -106,7 +95,7 @@ func TestSegment_getRowCount(t *testing.T) { assert.Equal(t, collection.ID(), collectionID) segmentID := UniqueID(0) - segment, err := newSegment(collection, segmentID, defaultPartitionID, collectionID, "", segmentTypeGrowing, defaultSegmentVersion, pool) + segment, err := newSegment(collection, segmentID, defaultPartitionID, collectionID, "", segmentTypeGrowing, defaultSegmentVersion) assert.Equal(t, segmentID, segment.segmentID) assert.Nil(t, err) @@ -141,9 +130,6 @@ func TestSegment_getRowCount(t *testing.T) { } func TestSegment_retrieve(t *testing.T) { - pool, err := concurrency.NewPool(runtime.GOMAXPROCS(0)) - require.NoError(t, err) - collectionID := UniqueID(0) schema := genTestCollectionSchema() @@ -151,7 +137,7 @@ func TestSegment_retrieve(t *testing.T) { assert.Equal(t, collection.ID(), collectionID) segmentID := UniqueID(0) - segment, err := newSegment(collection, segmentID, defaultPartitionID, collectionID, "", segmentTypeGrowing, defaultSegmentVersion, pool) + segment, err := newSegment(collection, segmentID, defaultPartitionID, collectionID, "", segmentTypeGrowing, defaultSegmentVersion) assert.Equal(t, segmentID, segment.segmentID) assert.Nil(t, err) @@ -228,9 +214,6 @@ func TestSegment_retrieve(t *testing.T) { } func TestSegment_getDeletedCount(t *testing.T) { - pool, err := concurrency.NewPool(runtime.GOMAXPROCS(0)) - require.NoError(t, err) - collectionID := UniqueID(0) schema := genTestCollectionSchema() @@ -238,7 +221,7 @@ func TestSegment_getDeletedCount(t *testing.T) { assert.Equal(t, collection.ID(), collectionID) segmentID := UniqueID(0) - segment, err := newSegment(collection, segmentID, defaultPartitionID, collectionID, "", segmentTypeGrowing, defaultSegmentVersion, pool) + segment, err := newSegment(collection, segmentID, defaultPartitionID, collectionID, "", segmentTypeGrowing, defaultSegmentVersion) assert.Equal(t, segmentID, segment.segmentID) assert.Nil(t, err) @@ -280,9 +263,6 @@ func TestSegment_getDeletedCount(t *testing.T) { } func TestSegment_getMemSize(t *testing.T) { - pool, err := concurrency.NewPool(runtime.GOMAXPROCS(0)) - require.NoError(t, err) - collectionID := UniqueID(0) schema := genTestCollectionSchema() @@ -290,7 +270,7 @@ func TestSegment_getMemSize(t *testing.T) { assert.Equal(t, collection.ID(), collectionID) segmentID := UniqueID(0) - segment, err := newSegment(collection, segmentID, defaultPartitionID, collectionID, "", segmentTypeGrowing, defaultSegmentVersion, pool) + segment, err := newSegment(collection, segmentID, defaultPartitionID, collectionID, "", segmentTypeGrowing, defaultSegmentVersion) assert.Equal(t, segmentID, segment.segmentID) assert.Nil(t, err) @@ -317,18 +297,15 @@ func TestSegment_getMemSize(t *testing.T) { deleteCollection(collection) } -//-------------------------------------------------------------------------------------- dm & search functions +// -------------------------------------------------------------------------------------- dm & search functions func TestSegment_segmentInsert(t *testing.T) { - pool, err := concurrency.NewPool(runtime.GOMAXPROCS(0)) - require.NoError(t, err) - collectionID := UniqueID(0) schema := genTestCollectionSchema() collection := newCollection(collectionID, schema) assert.Equal(t, collection.ID(), collectionID) segmentID := UniqueID(0) - segment, err := newSegment(collection, segmentID, defaultPartitionID, collectionID, "", segmentTypeGrowing, defaultSegmentVersion, pool) + segment, err := newSegment(collection, segmentID, defaultPartitionID, collectionID, "", segmentTypeGrowing, defaultSegmentVersion) assert.Equal(t, segmentID, segment.segmentID) assert.Nil(t, err) @@ -366,16 +343,13 @@ func TestSegment_segmentInsert(t *testing.T) { } func TestSegment_segmentDelete(t *testing.T) { - pool, err := concurrency.NewPool(runtime.GOMAXPROCS(0)) - require.NoError(t, err) - collectionID := UniqueID(0) schema := genTestCollectionSchema() collection := newCollection(collectionID, schema) assert.Equal(t, collection.ID(), collectionID) segmentID := UniqueID(0) - segment, err := newSegment(collection, segmentID, defaultPartitionID, collectionID, "", segmentTypeGrowing, defaultSegmentVersion, pool) + segment, err := newSegment(collection, segmentID, defaultPartitionID, collectionID, "", segmentTypeGrowing, defaultSegmentVersion) assert.Equal(t, segmentID, segment.segmentID) assert.Nil(t, err) @@ -461,18 +435,15 @@ func TestSegment_segmentSearch(t *testing.T) { deleteCollection(collection) } -//-------------------------------------------------------------------------------------- preDm functions +// -------------------------------------------------------------------------------------- preDm functions func TestSegment_segmentPreInsert(t *testing.T) { - pool, err := concurrency.NewPool(runtime.GOMAXPROCS(0)) - require.NoError(t, err) - collectionID := UniqueID(0) schema := genTestCollectionSchema() collection := newCollection(collectionID, schema) assert.Equal(t, collection.ID(), collectionID) segmentID := UniqueID(0) - segment, err := newSegment(collection, segmentID, defaultPartitionID, collectionID, "", segmentTypeGrowing, defaultSegmentVersion, pool) + segment, err := newSegment(collection, segmentID, defaultPartitionID, collectionID, "", segmentTypeGrowing, defaultSegmentVersion) assert.Equal(t, segmentID, segment.segmentID) assert.Nil(t, err) @@ -485,16 +456,13 @@ func TestSegment_segmentPreInsert(t *testing.T) { } func TestSegment_segmentPreDelete(t *testing.T) { - pool, err := concurrency.NewPool(runtime.GOMAXPROCS(0)) - require.NoError(t, err) - collectionID := UniqueID(0) schema := genTestCollectionSchema() collection := newCollection(collectionID, schema) assert.Equal(t, collection.ID(), collectionID) segmentID := UniqueID(0) - segment, err := newSegment(collection, segmentID, defaultPartitionID, collectionID, "", segmentTypeGrowing, defaultSegmentVersion, pool) + segment, err := newSegment(collection, segmentID, defaultPartitionID, collectionID, "", segmentTypeGrowing, defaultSegmentVersion) assert.Equal(t, segmentID, segment.segmentID) assert.Nil(t, err) @@ -520,9 +488,6 @@ func TestSegment_segmentPreDelete(t *testing.T) { } func TestSegment_segmentLoadDeletedRecord(t *testing.T) { - pool, err := concurrency.NewPool(runtime.GOMAXPROCS(0)) - require.NoError(t, err) - fieldParam := constFieldParam{ id: 100, dataType: schemapb.DataType_Int64, @@ -542,8 +507,7 @@ func TestSegment_segmentLoadDeletedRecord(t *testing.T) { defaultCollectionID, defaultDMLChannel, segmentTypeSealed, - defaultSegmentVersion, - pool) + defaultSegmentVersion) assert.Nil(t, err) ids := []int64{1, 2, 3} pks := make([]primaryKey, 0) @@ -612,9 +576,6 @@ func TestSegment_indexInfo(t *testing.T) { } func TestSegment_BasicMetrics(t *testing.T) { - pool, err := concurrency.NewPool(runtime.GOMAXPROCS(0)) - require.NoError(t, err) - schema := genTestCollectionSchema() collection := newCollection(defaultCollectionID, schema) segment, err := newSegment(collection, @@ -623,8 +584,7 @@ func TestSegment_BasicMetrics(t *testing.T) { defaultCollectionID, defaultDMLChannel, segmentTypeSealed, - defaultSegmentVersion, - pool) + defaultSegmentVersion) assert.Nil(t, err) t.Run("test id binlog row size", func(t *testing.T) { @@ -663,9 +623,6 @@ func TestSegment_BasicMetrics(t *testing.T) { func TestSegment_fillIndexedFieldsData(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - pool, err := concurrency.NewPool(runtime.GOMAXPROCS(0)) - require.NoError(t, err) - schema := genTestCollectionSchema() collection := newCollection(defaultCollectionID, schema) segment, err := newSegment(collection, @@ -674,8 +631,7 @@ func TestSegment_fillIndexedFieldsData(t *testing.T) { defaultCollectionID, defaultDMLChannel, segmentTypeSealed, - defaultSegmentVersion, - pool) + defaultSegmentVersion) assert.Nil(t, err) vecCM, err := genVectorChunkManager(ctx, collection)