mirror of https://github.com/milvus-io/milvus.git
parent
e5a67682ef
commit
c28f1a24f8
|
@ -43,7 +43,6 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/proto/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/util/concurrency"
|
||||
)
|
||||
|
||||
// ReplicaInterface specifies all the methods that the Collection object needs to implement in QueryNode.
|
||||
|
@ -131,8 +130,6 @@ type metaReplica struct {
|
|||
sealedSegments map[UniqueID]*Segment
|
||||
|
||||
excludedSegments map[UniqueID][]*datapb.SegmentInfo // map[collectionID]segmentIDs
|
||||
|
||||
cgoPool *concurrency.Pool
|
||||
}
|
||||
|
||||
// getSegmentsMemSize get the memory size in bytes of all the Segments
|
||||
|
@ -538,7 +535,7 @@ func (replica *metaReplica) addSegment(segmentID UniqueID, partitionID UniqueID,
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
seg, err := newSegment(collection, segmentID, partitionID, collectionID, vChannelID, segType, replica.cgoPool)
|
||||
seg, err := newSegment(collection, segmentID, partitionID, collectionID, vChannelID, segType)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -759,7 +756,7 @@ func (replica *metaReplica) freeAll() {
|
|||
}
|
||||
|
||||
// newCollectionReplica returns a new ReplicaInterface
|
||||
func newCollectionReplica(pool *concurrency.Pool) ReplicaInterface {
|
||||
func newCollectionReplica() ReplicaInterface {
|
||||
var replica ReplicaInterface = &metaReplica{
|
||||
collections: make(map[UniqueID]*Collection),
|
||||
partitions: make(map[UniqueID]*Partition),
|
||||
|
@ -767,7 +764,6 @@ func newCollectionReplica(pool *concurrency.Pool) ReplicaInterface {
|
|||
sealedSegments: make(map[UniqueID]*Segment),
|
||||
|
||||
excludedSegments: make(map[UniqueID][]*datapb.SegmentInfo),
|
||||
cgoPool: pool,
|
||||
}
|
||||
|
||||
return replica
|
||||
|
|
|
@ -17,15 +17,11 @@
|
|||
package querynode
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/util/concurrency"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestMetaReplica_collection(t *testing.T) {
|
||||
|
@ -228,9 +224,6 @@ func TestMetaReplica_segment(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
defer replica.freeAll()
|
||||
|
||||
pool, err := concurrency.NewPool(runtime.GOMAXPROCS(0))
|
||||
require.NoError(t, err)
|
||||
|
||||
schema := genTestCollectionSchema()
|
||||
collection := replica.addCollection(defaultCollectionID, schema)
|
||||
replica.addPartition(defaultCollectionID, defaultPartitionID)
|
||||
|
@ -250,12 +243,12 @@ func TestMetaReplica_segment(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
segment1, err := newSegment(collection, UniqueID(1), defaultPartitionID, defaultCollectionID, "", segmentTypeGrowing, pool)
|
||||
segment1, err := newSegment(collection, UniqueID(1), defaultPartitionID, defaultCollectionID, "", segmentTypeGrowing)
|
||||
assert.NoError(t, err)
|
||||
err = replica.setSegment(segment1)
|
||||
assert.NoError(t, err)
|
||||
|
||||
segment2, err := newSegment(collection, UniqueID(2), defaultPartitionID, defaultCollectionID, "", segmentTypeSealed, pool)
|
||||
segment2, err := newSegment(collection, UniqueID(2), defaultPartitionID, defaultCollectionID, "", segmentTypeSealed)
|
||||
assert.NoError(t, err)
|
||||
segment2.setIndexedFieldInfo(fieldID, indexInfo)
|
||||
err = replica.setSegment(segment2)
|
||||
|
@ -277,30 +270,27 @@ func TestMetaReplica_segment(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
defer replica.freeAll()
|
||||
|
||||
pool, err := concurrency.NewPool(runtime.GOMAXPROCS(0))
|
||||
require.NoError(t, err)
|
||||
|
||||
schema := genTestCollectionSchema()
|
||||
collection := replica.addCollection(defaultCollectionID, schema)
|
||||
replica.addPartition(defaultCollectionID, defaultPartitionID)
|
||||
replica.addPartition(defaultCollectionID, defaultPartitionID+1)
|
||||
|
||||
segment1, err := newSegment(collection, UniqueID(1), defaultPartitionID, defaultCollectionID, "channel1", segmentTypeGrowing, pool)
|
||||
segment1, err := newSegment(collection, UniqueID(1), defaultPartitionID, defaultCollectionID, "channel1", segmentTypeGrowing)
|
||||
assert.NoError(t, err)
|
||||
err = replica.setSegment(segment1)
|
||||
assert.NoError(t, err)
|
||||
|
||||
segment2, err := newSegment(collection, UniqueID(2), defaultPartitionID+1, defaultCollectionID, "channel2", segmentTypeGrowing, pool)
|
||||
segment2, err := newSegment(collection, UniqueID(2), defaultPartitionID+1, defaultCollectionID, "channel2", segmentTypeGrowing)
|
||||
assert.NoError(t, err)
|
||||
err = replica.setSegment(segment2)
|
||||
assert.NoError(t, err)
|
||||
|
||||
segment3, err := newSegment(collection, UniqueID(3), defaultPartitionID+1, defaultCollectionID, "channel2", segmentTypeGrowing, pool)
|
||||
segment3, err := newSegment(collection, UniqueID(3), defaultPartitionID+1, defaultCollectionID, "channel2", segmentTypeGrowing)
|
||||
assert.NoError(t, err)
|
||||
err = replica.setSegment(segment3)
|
||||
assert.NoError(t, err)
|
||||
|
||||
segment4, err := newSegment(collection, UniqueID(4), defaultPartitionID, defaultCollectionID, "channel1", segmentTypeSealed, pool)
|
||||
segment4, err := newSegment(collection, UniqueID(4), defaultPartitionID, defaultCollectionID, "channel1", segmentTypeSealed)
|
||||
assert.NoError(t, err)
|
||||
err = replica.setSegment(segment4)
|
||||
assert.NoError(t, err)
|
||||
|
|
|
@ -24,10 +24,8 @@ import (
|
|||
"math"
|
||||
"math/rand"
|
||||
"path"
|
||||
"runtime"
|
||||
"strconv"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/concurrency"
|
||||
"github.com/milvus-io/milvus/internal/util/dependency"
|
||||
"github.com/milvus-io/milvus/internal/util/indexcgowrapper"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
|
@ -1212,18 +1210,13 @@ func genSealedSegment(schema *schemapb.CollectionSchema,
|
|||
vChannel Channel,
|
||||
msgLength int) (*Segment, error) {
|
||||
col := newCollection(collectionID, schema)
|
||||
pool, err := concurrency.NewPool(runtime.GOMAXPROCS(0))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
seg, err := newSegment(col,
|
||||
segmentID,
|
||||
partitionID,
|
||||
collectionID,
|
||||
vChannel,
|
||||
segmentTypeSealed,
|
||||
pool)
|
||||
segmentTypeSealed)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -1260,28 +1253,20 @@ func genSimpleSealedSegment(msgLength int) (*Segment, error) {
|
|||
}
|
||||
|
||||
func genSimpleReplica() (ReplicaInterface, error) {
|
||||
pool, err := concurrency.NewPool(runtime.GOMAXPROCS(0))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r := newCollectionReplica(pool)
|
||||
r := newCollectionReplica()
|
||||
schema := genTestCollectionSchema()
|
||||
r.addCollection(defaultCollectionID, schema)
|
||||
err = r.addPartition(defaultCollectionID, defaultPartitionID)
|
||||
err := r.addPartition(defaultCollectionID, defaultPartitionID)
|
||||
return r, err
|
||||
}
|
||||
|
||||
func genSimpleSegmentLoaderWithMqFactory(metaReplica ReplicaInterface, factory msgstream.Factory) (*segmentLoader, error) {
|
||||
pool, err := concurrency.NewPool(runtime.GOMAXPROCS(1))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
kv, err := genEtcdKV()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cm := storage.NewLocalChunkManager(storage.RootPath(defaultLocalStorage))
|
||||
return newSegmentLoader(metaReplica, kv, cm, factory, pool), nil
|
||||
return newSegmentLoader(metaReplica, kv, cm, factory), nil
|
||||
}
|
||||
|
||||
func genSimpleReplicaWithSealSegment(ctx context.Context) (ReplicaInterface, error) {
|
||||
|
|
|
@ -34,11 +34,9 @@ import "C"
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
@ -47,7 +45,6 @@ import (
|
|||
"unsafe"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"go.uber.org/zap"
|
||||
|
@ -61,7 +58,6 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
"github.com/milvus-io/milvus/internal/util"
|
||||
"github.com/milvus-io/milvus/internal/util/concurrency"
|
||||
"github.com/milvus-io/milvus/internal/util/dependency"
|
||||
"github.com/milvus-io/milvus/internal/util/paramtable"
|
||||
"github.com/milvus-io/milvus/internal/util/sessionutil"
|
||||
|
@ -127,9 +123,6 @@ type QueryNode struct {
|
|||
ShardClusterService *ShardClusterService
|
||||
//shard query service, handles shard-level query & search
|
||||
queryShardService *queryShardService
|
||||
|
||||
// cgoPool is the worker pool to control concurrency of cgo call
|
||||
cgoPool *concurrency.Pool
|
||||
}
|
||||
|
||||
// NewQueryNode will return a QueryNode with abnormal state.
|
||||
|
@ -242,39 +235,13 @@ func (node *QueryNode) Init() error {
|
|||
node.etcdKV = etcdkv.NewEtcdKV(node.etcdCli, Params.EtcdCfg.MetaRootPath)
|
||||
log.Info("queryNode try to connect etcd success", zap.Any("MetaRootPath", Params.EtcdCfg.MetaRootPath))
|
||||
|
||||
cpuNum := runtime.GOMAXPROCS(0)
|
||||
node.cgoPool, err = concurrency.NewPool(cpuNum, ants.WithPreAlloc(true),
|
||||
ants.WithExpiryDuration(math.MaxInt64))
|
||||
if err != nil {
|
||||
log.Error("QueryNode init cgo pool failed", zap.Error(err))
|
||||
initError = err
|
||||
return
|
||||
}
|
||||
|
||||
// ensure every cgopool go routine is locked with a OS thread
|
||||
// so openmp in knowhere won't create too much request
|
||||
sig := make(chan struct{})
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(cpuNum)
|
||||
for i := 0; i < cpuNum; i++ {
|
||||
node.cgoPool.Submit(func() (interface{}, error) {
|
||||
runtime.LockOSThread()
|
||||
wg.Done()
|
||||
<-sig
|
||||
return nil, nil
|
||||
})
|
||||
}
|
||||
wg.Wait()
|
||||
close(sig)
|
||||
|
||||
node.metaReplica = newCollectionReplica(node.cgoPool)
|
||||
node.metaReplica = newCollectionReplica()
|
||||
|
||||
node.loader = newSegmentLoader(
|
||||
node.metaReplica,
|
||||
node.etcdKV,
|
||||
node.vectorStorage,
|
||||
node.factory,
|
||||
node.cgoPool)
|
||||
node.factory)
|
||||
|
||||
node.dataSyncService = newDataSyncService(node.queryNodeLoopCtx, node.metaReplica, node.tSafeReplica, node.factory)
|
||||
|
||||
|
|
|
@ -22,7 +22,6 @@ import (
|
|||
"math/rand"
|
||||
"net/url"
|
||||
"os"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
|
@ -33,7 +32,6 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
"go.etcd.io/etcd/server/v3/embed"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/concurrency"
|
||||
"github.com/milvus-io/milvus/internal/util/dependency"
|
||||
|
||||
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
||||
|
@ -97,12 +95,7 @@ func newQueryNodeMock() *QueryNode {
|
|||
svr := NewQueryNode(ctx, factory)
|
||||
tsReplica := newTSafeReplica()
|
||||
|
||||
pool, err := concurrency.NewPool(runtime.GOMAXPROCS(0))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
replica := newCollectionReplica(pool)
|
||||
replica := newCollectionReplica()
|
||||
svr.metaReplica = replica
|
||||
svr.dataSyncService = newDataSyncService(ctx, svr.metaReplica, tsReplica, factory)
|
||||
svr.statsService = newStatsService(ctx, svr.metaReplica, factory)
|
||||
|
@ -114,7 +107,7 @@ func newQueryNodeMock() *QueryNode {
|
|||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
svr.loader = newSegmentLoader(svr.metaReplica, etcdKV, svr.vectorStorage, factory, pool)
|
||||
svr.loader = newSegmentLoader(svr.metaReplica, etcdKV, svr.vectorStorage, factory)
|
||||
svr.etcdKV = etcdKV
|
||||
|
||||
return svr
|
||||
|
|
|
@ -36,7 +36,6 @@ import (
|
|||
"sync"
|
||||
"unsafe"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/concurrency"
|
||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/metrics"
|
||||
|
@ -98,8 +97,6 @@ type Segment struct {
|
|||
indexedFieldInfos map[UniqueID]*IndexedFieldInfo
|
||||
|
||||
pkFilter *bloom.BloomFilter // bloom filter of pk inside a segment
|
||||
|
||||
pool *concurrency.Pool
|
||||
}
|
||||
|
||||
// ID returns the identity number.
|
||||
|
@ -168,7 +165,7 @@ func (s *Segment) hasLoadIndexForIndexedField(fieldID int64) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func newSegment(collection *Collection, segmentID UniqueID, partitionID UniqueID, collectionID UniqueID, vChannelID Channel, segType segmentType, pool *concurrency.Pool) (*Segment, error) {
|
||||
func newSegment(collection *Collection, segmentID UniqueID, partitionID UniqueID, collectionID UniqueID, vChannelID Channel, segType segmentType) (*Segment, error) {
|
||||
/*
|
||||
CSegmentInterface
|
||||
NewSegment(CCollection collection, uint64_t segment_id, SegmentType seg_type);
|
||||
|
@ -176,15 +173,9 @@ func newSegment(collection *Collection, segmentID UniqueID, partitionID UniqueID
|
|||
var segmentPtr C.CSegmentInterface
|
||||
switch segType {
|
||||
case segmentTypeSealed:
|
||||
pool.Submit(func() (interface{}, error) {
|
||||
segmentPtr = C.NewSegment(collection.collectionPtr, C.Sealed, C.int64_t(segmentID))
|
||||
return nil, nil
|
||||
}).Await()
|
||||
segmentPtr = C.NewSegment(collection.collectionPtr, C.Sealed, C.int64_t(segmentID))
|
||||
case segmentTypeGrowing:
|
||||
pool.Submit(func() (interface{}, error) {
|
||||
segmentPtr = C.NewSegment(collection.collectionPtr, C.Growing, C.int64_t(segmentID))
|
||||
return nil, nil
|
||||
}).Await()
|
||||
segmentPtr = C.NewSegment(collection.collectionPtr, C.Growing, C.int64_t(segmentID))
|
||||
default:
|
||||
err := fmt.Errorf("illegal segment type %d when create segment %d", segType, segmentID)
|
||||
log.Error("create new segment error",
|
||||
|
@ -212,7 +203,6 @@ func newSegment(collection *Collection, segmentID UniqueID, partitionID UniqueID
|
|||
indexedFieldInfos: make(map[UniqueID]*IndexedFieldInfo),
|
||||
|
||||
pkFilter: bloom.NewWithEstimates(bloomFilterSize, maxBloomFalsePositive),
|
||||
pool: pool,
|
||||
}
|
||||
|
||||
return segment, nil
|
||||
|
@ -228,10 +218,8 @@ func deleteSegment(segment *Segment) {
|
|||
}
|
||||
|
||||
cPtr := segment.segmentPtr
|
||||
segment.pool.Submit(func() (interface{}, error) {
|
||||
C.DeleteSegment(cPtr)
|
||||
return nil, nil
|
||||
}).Await()
|
||||
C.DeleteSegment(cPtr)
|
||||
|
||||
segment.segmentPtr = nil
|
||||
|
||||
log.Info("delete segment from memory",
|
||||
|
@ -249,11 +237,7 @@ func (s *Segment) getRowCount() int64 {
|
|||
if s.segmentPtr == nil {
|
||||
return -1
|
||||
}
|
||||
var rowCount C.int64_t
|
||||
s.pool.Submit(func() (interface{}, error) {
|
||||
rowCount = C.GetRowCount(s.segmentPtr)
|
||||
return nil, nil
|
||||
}).Await()
|
||||
var rowCount C.int64_t = C.GetRowCount(s.segmentPtr)
|
||||
|
||||
return int64(rowCount)
|
||||
}
|
||||
|
@ -267,11 +251,7 @@ func (s *Segment) getDeletedCount() int64 {
|
|||
return -1
|
||||
}
|
||||
|
||||
var deletedCount C.int64_t
|
||||
s.pool.Submit(func() (interface{}, error) {
|
||||
deletedCount = C.GetRowCount(s.segmentPtr)
|
||||
return nil, nil
|
||||
}).Await()
|
||||
var deletedCount C.int64_t = C.GetRowCount(s.segmentPtr)
|
||||
|
||||
return int64(deletedCount)
|
||||
}
|
||||
|
@ -284,11 +264,7 @@ func (s *Segment) getMemSize() int64 {
|
|||
if s.segmentPtr == nil {
|
||||
return -1
|
||||
}
|
||||
var memoryUsageInBytes C.int64_t
|
||||
s.pool.Submit(func() (interface{}, error) {
|
||||
memoryUsageInBytes = C.GetMemoryUsageInBytes(s.segmentPtr)
|
||||
return nil, nil
|
||||
}).Await()
|
||||
var memoryUsageInBytes C.int64_t = C.GetMemoryUsageInBytes(s.segmentPtr)
|
||||
|
||||
return int64(memoryUsageInBytes)
|
||||
}
|
||||
|
@ -319,14 +295,11 @@ func (s *Segment) search(searchReq *searchRequest) (*SearchResult, error) {
|
|||
zap.String("segmentType", s.segmentType.String()),
|
||||
zap.Bool("loadIndex", loadIndex))
|
||||
|
||||
var status C.CStatus
|
||||
s.pool.Submit(func() (interface{}, error) {
|
||||
tr := timerecord.NewTimeRecorder("cgoSearch")
|
||||
status = C.Search(s.segmentPtr, searchReq.plan.cSearchPlan, searchReq.cPlaceholderGroup,
|
||||
C.uint64_t(searchReq.timestamp), &searchResult.cSearchResult, C.int64_t(s.segmentID))
|
||||
metrics.QueryNodeSQSegmentLatencyInCore.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.SearchLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
return nil, nil
|
||||
}).Await()
|
||||
tr := timerecord.NewTimeRecorder("cgoSearch")
|
||||
var status C.CStatus = C.Search(s.segmentPtr, searchReq.plan.cSearchPlan, searchReq.cPlaceholderGroup,
|
||||
C.uint64_t(searchReq.timestamp), &searchResult.cSearchResult, C.int64_t(s.segmentID))
|
||||
metrics.QueryNodeSQSegmentLatencyInCore.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.SearchLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
|
||||
if err := HandleCStatus(&status, "Search failed"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -346,18 +319,13 @@ func (s *Segment) retrieve(plan *RetrievePlan) (*segcorepb.RetrieveResults, erro
|
|||
var retrieveResult RetrieveResult
|
||||
ts := C.uint64_t(plan.Timestamp)
|
||||
|
||||
var status C.CStatus
|
||||
s.pool.Submit(func() (interface{}, error) {
|
||||
tr := timerecord.NewTimeRecorder("cgoRetrieve")
|
||||
status = C.Retrieve(s.segmentPtr, plan.cRetrievePlan, ts, &retrieveResult.cRetrieveResult)
|
||||
metrics.QueryNodeSQSegmentLatencyInCore.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()),
|
||||
metrics.QueryLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
log.Debug("do retrieve on segment",
|
||||
zap.Int64("msgID", plan.msgID),
|
||||
zap.Int64("segmentID", s.segmentID), zap.String("segmentType", s.segmentType.String()))
|
||||
|
||||
return nil, nil
|
||||
}).Await()
|
||||
tr := timerecord.NewTimeRecorder("cgoRetrieve")
|
||||
var status C.CStatus = C.Retrieve(s.segmentPtr, plan.cRetrievePlan, ts, &retrieveResult.cRetrieveResult)
|
||||
metrics.QueryNodeSQSegmentLatencyInCore.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()),
|
||||
metrics.QueryLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
log.Debug("do retrieve on segment",
|
||||
zap.Int64("msgID", plan.msgID),
|
||||
zap.Int64("segmentID", s.segmentID), zap.String("segmentType", s.segmentType.String()))
|
||||
|
||||
if err := HandleCStatus(&status, "Retrieve failed"); err != nil {
|
||||
return nil, err
|
||||
|
@ -601,10 +569,9 @@ func (s *Segment) segmentPreInsert(numOfRecords int) (int64, error) {
|
|||
var offset int64
|
||||
var status C.CStatus
|
||||
cOffset := (*C.int64_t)(&offset)
|
||||
s.pool.Submit(func() (interface{}, error) {
|
||||
status = C.PreInsert(s.segmentPtr, C.int64_t(int64(numOfRecords)), cOffset)
|
||||
return nil, nil
|
||||
}).Await()
|
||||
|
||||
status = C.PreInsert(s.segmentPtr, C.int64_t(int64(numOfRecords)), cOffset)
|
||||
|
||||
if err := HandleCStatus(&status, "PreInsert failed"); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
@ -617,12 +584,7 @@ func (s *Segment) segmentPreDelete(numOfRecords int) int64 {
|
|||
PreDelete(CSegmentInterface c_segment, long int size);
|
||||
*/
|
||||
|
||||
var offset C.int64_t
|
||||
s.pool.Submit(func() (interface{}, error) {
|
||||
offset = C.PreDelete(s.segmentPtr, C.int64_t(int64(numOfRecords)))
|
||||
|
||||
return nil, nil
|
||||
}).Await()
|
||||
var offset C.int64_t = C.PreDelete(s.segmentPtr, C.int64_t(int64(numOfRecords)))
|
||||
|
||||
return int64(offset)
|
||||
}
|
||||
|
@ -647,19 +609,13 @@ func (s *Segment) segmentInsert(offset int64, entityIDs []UniqueID, timestamps [
|
|||
var cEntityIdsPtr = (*C.int64_t)(&(entityIDs)[0])
|
||||
var cTimestampsPtr = (*C.uint64_t)(&(timestamps)[0])
|
||||
|
||||
var status C.CStatus
|
||||
|
||||
s.pool.Submit(func() (interface{}, error) {
|
||||
status = C.Insert(s.segmentPtr,
|
||||
cOffset,
|
||||
cNumOfRows,
|
||||
cEntityIdsPtr,
|
||||
cTimestampsPtr,
|
||||
(*C.uint8_t)(unsafe.Pointer(&insertRecordBlob[0])),
|
||||
(C.uint64_t)(len(insertRecordBlob)))
|
||||
|
||||
return nil, nil
|
||||
}).Await()
|
||||
var status C.CStatus = C.Insert(s.segmentPtr,
|
||||
cOffset,
|
||||
cNumOfRows,
|
||||
cEntityIdsPtr,
|
||||
cTimestampsPtr,
|
||||
(*C.uint8_t)(unsafe.Pointer(&insertRecordBlob[0])),
|
||||
(C.uint64_t)(len(insertRecordBlob)))
|
||||
|
||||
if err := HandleCStatus(&status, "Insert failed"); err != nil {
|
||||
return err
|
||||
|
@ -726,12 +682,7 @@ func (s *Segment) segmentDelete(offset int64, entityIDs []primaryKey, timestamps
|
|||
return fmt.Errorf("failed to marshal ids: %s", err)
|
||||
}
|
||||
|
||||
var status C.CStatus
|
||||
s.pool.Submit(func() (interface{}, error) {
|
||||
status = C.Delete(s.segmentPtr, cOffset, cSize, (*C.uint8_t)(unsafe.Pointer(&dataBlob[0])), (C.uint64_t)(len(dataBlob)), cTimestampsPtr)
|
||||
|
||||
return nil, nil
|
||||
}).Await()
|
||||
var status C.CStatus = C.Delete(s.segmentPtr, cOffset, cSize, (*C.uint8_t)(unsafe.Pointer(&dataBlob[0])), (C.uint64_t)(len(dataBlob)), cTimestampsPtr)
|
||||
|
||||
if err := HandleCStatus(&status, "Delete failed"); err != nil {
|
||||
return err
|
||||
|
@ -766,11 +717,7 @@ func (s *Segment) segmentLoadFieldData(fieldID int64, rowCount int64, data *sche
|
|||
row_count: C.int64_t(rowCount),
|
||||
}
|
||||
|
||||
var status C.CStatus
|
||||
s.pool.Submit(func() (interface{}, error) {
|
||||
status = C.LoadFieldData(s.segmentPtr, loadInfo)
|
||||
return nil, nil
|
||||
}).Await()
|
||||
var status C.CStatus = C.LoadFieldData(s.segmentPtr, loadInfo)
|
||||
|
||||
if err := HandleCStatus(&status, "LoadFieldData failed"); err != nil {
|
||||
return err
|
||||
|
@ -834,11 +781,7 @@ func (s *Segment) segmentLoadDeletedRecord(primaryKeys []primaryKey, timestamps
|
|||
CStatus
|
||||
LoadDeletedRecord(CSegmentInterface c_segment, CLoadDeletedRecordInfo deleted_record_info)
|
||||
*/
|
||||
var status C.CStatus
|
||||
s.pool.Submit(func() (interface{}, error) {
|
||||
status = C.LoadDeletedRecord(s.segmentPtr, loadInfo)
|
||||
return nil, nil
|
||||
}).Await()
|
||||
var status C.CStatus = C.LoadDeletedRecord(s.segmentPtr, loadInfo)
|
||||
|
||||
if err := HandleCStatus(&status, "LoadDeletedRecord failed"); err != nil {
|
||||
return err
|
||||
|
@ -872,11 +815,7 @@ func (s *Segment) segmentLoadIndexData(bytesIndex [][]byte, indexInfo *querypb.F
|
|||
return errors.New(errMsg)
|
||||
}
|
||||
|
||||
var status C.CStatus
|
||||
s.pool.Submit(func() (interface{}, error) {
|
||||
status = C.UpdateSealedSegmentIndex(s.segmentPtr, loadIndexInfo.cLoadIndexInfo)
|
||||
return nil, nil
|
||||
}).Await()
|
||||
var status C.CStatus = C.UpdateSealedSegmentIndex(s.segmentPtr, loadIndexInfo.cLoadIndexInfo)
|
||||
|
||||
if err := HandleCStatus(&status, "UpdateSealedSegmentIndex failed"); err != nil {
|
||||
return err
|
||||
|
|
|
@ -65,8 +65,6 @@ type segmentLoader struct {
|
|||
|
||||
ioPool *concurrency.Pool
|
||||
cpuPool *concurrency.Pool
|
||||
// cgoPool for all cgo invocation
|
||||
cgoPool *concurrency.Pool
|
||||
|
||||
factory msgstream.Factory
|
||||
}
|
||||
|
@ -147,7 +145,7 @@ func (loader *segmentLoader) LoadSegment(req *querypb.LoadSegmentsRequest, segme
|
|||
return err
|
||||
}
|
||||
|
||||
segment, err := newSegment(collection, segmentID, partitionID, collectionID, vChannelID, segmentType, loader.cgoPool)
|
||||
segment, err := newSegment(collection, segmentID, partitionID, collectionID, vChannelID, segmentType)
|
||||
if err != nil {
|
||||
log.Error("load segment failed when create new segment",
|
||||
zap.Int64("collectionID", collectionID),
|
||||
|
@ -837,8 +835,7 @@ func newSegmentLoader(
|
|||
metaReplica ReplicaInterface,
|
||||
etcdKV *etcdkv.EtcdKV,
|
||||
cm storage.ChunkManager,
|
||||
factory msgstream.Factory,
|
||||
pool *concurrency.Pool) *segmentLoader {
|
||||
factory msgstream.Factory) *segmentLoader {
|
||||
|
||||
cpuNum := runtime.GOMAXPROCS(0)
|
||||
// This error is not nil only if the options of creating pool is invalid
|
||||
|
@ -873,7 +870,6 @@ func newSegmentLoader(
|
|||
// init them later
|
||||
ioPool: ioPool,
|
||||
cpuPool: cpuPool,
|
||||
cgoPool: pool,
|
||||
|
||||
factory: factory,
|
||||
}
|
||||
|
|
|
@ -24,10 +24,6 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/common"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/mq/msgstream"
|
||||
|
@ -36,8 +32,9 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/proto/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/internal/util/concurrency"
|
||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
)
|
||||
|
||||
func TestSegmentLoader_loadSegment(t *testing.T) {
|
||||
|
@ -133,9 +130,6 @@ func TestSegmentLoader_loadSegmentFieldsData(t *testing.T) {
|
|||
loader := node.loader
|
||||
assert.NotNil(t, loader)
|
||||
|
||||
pool, err := concurrency.NewPool(runtime.GOMAXPROCS(0))
|
||||
require.NoError(t, err)
|
||||
|
||||
var fieldPk *schemapb.FieldSchema
|
||||
switch pkType {
|
||||
case schemapb.DataType_Int64:
|
||||
|
@ -183,8 +177,7 @@ func TestSegmentLoader_loadSegmentFieldsData(t *testing.T) {
|
|||
defaultPartitionID,
|
||||
defaultCollectionID,
|
||||
defaultDMLChannel,
|
||||
segmentTypeSealed,
|
||||
pool)
|
||||
segmentTypeSealed)
|
||||
assert.Nil(t, err)
|
||||
|
||||
binlog, _, err := saveBinLog(ctx, defaultCollectionID, defaultPartitionID, defaultSegmentID, defaultMsgLength, schema)
|
||||
|
@ -337,7 +330,7 @@ func TestSegmentLoader_testLoadGrowing(t *testing.T) {
|
|||
collection, err := node.metaReplica.getCollectionByID(defaultCollectionID)
|
||||
assert.NoError(t, err)
|
||||
|
||||
segment, err := newSegment(collection, defaultSegmentID+1, defaultPartitionID, defaultCollectionID, defaultDMLChannel, segmentTypeGrowing, loader.cgoPool)
|
||||
segment, err := newSegment(collection, defaultSegmentID+1, defaultPartitionID, defaultCollectionID, defaultDMLChannel, segmentTypeGrowing)
|
||||
assert.Nil(t, err)
|
||||
|
||||
insertData, err := genInsertData(defaultMsgLength, collection.schema)
|
||||
|
@ -366,7 +359,7 @@ func TestSegmentLoader_testLoadGrowing(t *testing.T) {
|
|||
collection, err := node.metaReplica.getCollectionByID(defaultCollectionID)
|
||||
assert.NoError(t, err)
|
||||
|
||||
segment, err := newSegment(collection, defaultSegmentID+1, defaultPartitionID, defaultCollectionID, defaultDMLChannel, segmentTypeGrowing, node.loader.cgoPool)
|
||||
segment, err := newSegment(collection, defaultSegmentID+1, defaultPartitionID, defaultCollectionID, defaultDMLChannel, segmentTypeGrowing)
|
||||
assert.Nil(t, err)
|
||||
|
||||
insertData, err := genInsertData(defaultMsgLength, collection.schema)
|
||||
|
|
|
@ -21,7 +21,6 @@ import (
|
|||
"fmt"
|
||||
"log"
|
||||
"math"
|
||||
"runtime"
|
||||
"testing"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
|
@ -29,24 +28,18 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/storage"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/common"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/planpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/proto/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/segcorepb"
|
||||
"github.com/milvus-io/milvus/internal/util/concurrency"
|
||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
//-------------------------------------------------------------------------------------- constructor and destructor
|
||||
func TestSegment_newSegment(t *testing.T) {
|
||||
pool, err := concurrency.NewPool(runtime.GOMAXPROCS(0))
|
||||
require.NoError(t, err)
|
||||
|
||||
collectionID := UniqueID(0)
|
||||
schema := genTestCollectionSchema()
|
||||
collectionMeta := genCollectionMeta(collectionID, schema)
|
||||
|
@ -55,7 +48,7 @@ func TestSegment_newSegment(t *testing.T) {
|
|||
assert.Equal(t, collection.ID(), collectionID)
|
||||
|
||||
segmentID := UniqueID(0)
|
||||
segment, err := newSegment(collection, segmentID, defaultPartitionID, collectionID, "", segmentTypeGrowing, pool)
|
||||
segment, err := newSegment(collection, segmentID, defaultPartitionID, collectionID, "", segmentTypeGrowing)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, segmentID, segment.segmentID)
|
||||
deleteSegment(segment)
|
||||
|
@ -65,15 +58,12 @@ func TestSegment_newSegment(t *testing.T) {
|
|||
_, err = newSegment(collection,
|
||||
defaultSegmentID,
|
||||
defaultPartitionID,
|
||||
collectionID, "", 100, pool)
|
||||
collectionID, "", 100)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestSegment_deleteSegment(t *testing.T) {
|
||||
pool, err := concurrency.NewPool(runtime.GOMAXPROCS(0))
|
||||
require.NoError(t, err)
|
||||
|
||||
collectionID := UniqueID(0)
|
||||
schema := genTestCollectionSchema()
|
||||
collectionMeta := genCollectionMeta(collectionID, schema)
|
||||
|
@ -82,7 +72,7 @@ func TestSegment_deleteSegment(t *testing.T) {
|
|||
assert.Equal(t, collection.ID(), collectionID)
|
||||
|
||||
segmentID := UniqueID(0)
|
||||
segment, err := newSegment(collection, segmentID, defaultPartitionID, collectionID, "", segmentTypeGrowing, pool)
|
||||
segment, err := newSegment(collection, segmentID, defaultPartitionID, collectionID, "", segmentTypeGrowing)
|
||||
assert.Equal(t, segmentID, segment.segmentID)
|
||||
assert.Nil(t, err)
|
||||
|
||||
|
@ -99,9 +89,6 @@ func TestSegment_deleteSegment(t *testing.T) {
|
|||
|
||||
//-------------------------------------------------------------------------------------- stats functions
|
||||
func TestSegment_getRowCount(t *testing.T) {
|
||||
pool, err := concurrency.NewPool(runtime.GOMAXPROCS(0))
|
||||
require.NoError(t, err)
|
||||
|
||||
collectionID := UniqueID(0)
|
||||
schema := genTestCollectionSchema()
|
||||
|
||||
|
@ -109,7 +96,7 @@ func TestSegment_getRowCount(t *testing.T) {
|
|||
assert.Equal(t, collection.ID(), collectionID)
|
||||
|
||||
segmentID := UniqueID(0)
|
||||
segment, err := newSegment(collection, segmentID, defaultPartitionID, collectionID, "", segmentTypeGrowing, pool)
|
||||
segment, err := newSegment(collection, segmentID, defaultPartitionID, collectionID, "", segmentTypeGrowing)
|
||||
assert.Equal(t, segmentID, segment.segmentID)
|
||||
assert.Nil(t, err)
|
||||
|
||||
|
@ -144,9 +131,6 @@ func TestSegment_getRowCount(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestSegment_retrieve(t *testing.T) {
|
||||
pool, err := concurrency.NewPool(runtime.GOMAXPROCS(0))
|
||||
require.NoError(t, err)
|
||||
|
||||
collectionID := UniqueID(0)
|
||||
schema := genTestCollectionSchema()
|
||||
|
||||
|
@ -154,7 +138,7 @@ func TestSegment_retrieve(t *testing.T) {
|
|||
assert.Equal(t, collection.ID(), collectionID)
|
||||
|
||||
segmentID := UniqueID(0)
|
||||
segment, err := newSegment(collection, segmentID, defaultPartitionID, collectionID, "", segmentTypeGrowing, pool)
|
||||
segment, err := newSegment(collection, segmentID, defaultPartitionID, collectionID, "", segmentTypeGrowing)
|
||||
assert.Equal(t, segmentID, segment.segmentID)
|
||||
assert.Nil(t, err)
|
||||
|
||||
|
@ -226,9 +210,6 @@ func TestSegment_retrieve(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestSegment_getDeletedCount(t *testing.T) {
|
||||
pool, err := concurrency.NewPool(runtime.GOMAXPROCS(0))
|
||||
require.NoError(t, err)
|
||||
|
||||
collectionID := UniqueID(0)
|
||||
schema := genTestCollectionSchema()
|
||||
|
||||
|
@ -236,7 +217,7 @@ func TestSegment_getDeletedCount(t *testing.T) {
|
|||
assert.Equal(t, collection.ID(), collectionID)
|
||||
|
||||
segmentID := UniqueID(0)
|
||||
segment, err := newSegment(collection, segmentID, defaultPartitionID, collectionID, "", segmentTypeGrowing, pool)
|
||||
segment, err := newSegment(collection, segmentID, defaultPartitionID, collectionID, "", segmentTypeGrowing)
|
||||
assert.Equal(t, segmentID, segment.segmentID)
|
||||
assert.Nil(t, err)
|
||||
|
||||
|
@ -277,9 +258,6 @@ func TestSegment_getDeletedCount(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestSegment_getMemSize(t *testing.T) {
|
||||
pool, err := concurrency.NewPool(runtime.GOMAXPROCS(0))
|
||||
require.NoError(t, err)
|
||||
|
||||
collectionID := UniqueID(0)
|
||||
schema := genTestCollectionSchema()
|
||||
|
||||
|
@ -287,7 +265,7 @@ func TestSegment_getMemSize(t *testing.T) {
|
|||
assert.Equal(t, collection.ID(), collectionID)
|
||||
|
||||
segmentID := UniqueID(0)
|
||||
segment, err := newSegment(collection, segmentID, defaultPartitionID, collectionID, "", segmentTypeGrowing, pool)
|
||||
segment, err := newSegment(collection, segmentID, defaultPartitionID, collectionID, "", segmentTypeGrowing)
|
||||
assert.Equal(t, segmentID, segment.segmentID)
|
||||
assert.Nil(t, err)
|
||||
|
||||
|
@ -316,16 +294,13 @@ func TestSegment_getMemSize(t *testing.T) {
|
|||
|
||||
//-------------------------------------------------------------------------------------- dm & search functions
|
||||
func TestSegment_segmentInsert(t *testing.T) {
|
||||
pool, err := concurrency.NewPool(runtime.GOMAXPROCS(0))
|
||||
require.NoError(t, err)
|
||||
|
||||
collectionID := UniqueID(0)
|
||||
schema := genTestCollectionSchema()
|
||||
|
||||
collection := newCollection(collectionID, schema)
|
||||
assert.Equal(t, collection.ID(), collectionID)
|
||||
segmentID := UniqueID(0)
|
||||
segment, err := newSegment(collection, segmentID, defaultPartitionID, collectionID, "", segmentTypeGrowing, pool)
|
||||
segment, err := newSegment(collection, segmentID, defaultPartitionID, collectionID, "", segmentTypeGrowing)
|
||||
assert.Equal(t, segmentID, segment.segmentID)
|
||||
assert.Nil(t, err)
|
||||
|
||||
|
@ -363,16 +338,13 @@ func TestSegment_segmentInsert(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestSegment_segmentDelete(t *testing.T) {
|
||||
pool, err := concurrency.NewPool(runtime.GOMAXPROCS(0))
|
||||
require.NoError(t, err)
|
||||
|
||||
collectionID := UniqueID(0)
|
||||
schema := genTestCollectionSchema()
|
||||
collection := newCollection(collectionID, schema)
|
||||
assert.Equal(t, collection.ID(), collectionID)
|
||||
|
||||
segmentID := UniqueID(0)
|
||||
segment, err := newSegment(collection, segmentID, defaultPartitionID, collectionID, "", segmentTypeGrowing, pool)
|
||||
segment, err := newSegment(collection, segmentID, defaultPartitionID, collectionID, "", segmentTypeGrowing)
|
||||
assert.Equal(t, segmentID, segment.segmentID)
|
||||
assert.Nil(t, err)
|
||||
|
||||
|
@ -462,16 +434,13 @@ func TestSegment_segmentSearch(t *testing.T) {
|
|||
|
||||
//-------------------------------------------------------------------------------------- preDm functions
|
||||
func TestSegment_segmentPreInsert(t *testing.T) {
|
||||
pool, err := concurrency.NewPool(runtime.GOMAXPROCS(0))
|
||||
require.NoError(t, err)
|
||||
|
||||
collectionID := UniqueID(0)
|
||||
schema := genTestCollectionSchema()
|
||||
collection := newCollection(collectionID, schema)
|
||||
assert.Equal(t, collection.ID(), collectionID)
|
||||
|
||||
segmentID := UniqueID(0)
|
||||
segment, err := newSegment(collection, segmentID, defaultPartitionID, collectionID, "", segmentTypeGrowing, pool)
|
||||
segment, err := newSegment(collection, segmentID, defaultPartitionID, collectionID, "", segmentTypeGrowing)
|
||||
assert.Equal(t, segmentID, segment.segmentID)
|
||||
assert.Nil(t, err)
|
||||
|
||||
|
@ -484,16 +453,13 @@ func TestSegment_segmentPreInsert(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestSegment_segmentPreDelete(t *testing.T) {
|
||||
pool, err := concurrency.NewPool(runtime.GOMAXPROCS(0))
|
||||
require.NoError(t, err)
|
||||
|
||||
collectionID := UniqueID(0)
|
||||
schema := genTestCollectionSchema()
|
||||
collection := newCollection(collectionID, schema)
|
||||
assert.Equal(t, collection.ID(), collectionID)
|
||||
|
||||
segmentID := UniqueID(0)
|
||||
segment, err := newSegment(collection, segmentID, defaultPartitionID, collectionID, "", segmentTypeGrowing, pool)
|
||||
segment, err := newSegment(collection, segmentID, defaultPartitionID, collectionID, "", segmentTypeGrowing)
|
||||
assert.Equal(t, segmentID, segment.segmentID)
|
||||
assert.Nil(t, err)
|
||||
|
||||
|
@ -519,9 +485,6 @@ func TestSegment_segmentPreDelete(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestSegment_segmentLoadDeletedRecord(t *testing.T) {
|
||||
pool, err := concurrency.NewPool(runtime.GOMAXPROCS(0))
|
||||
require.NoError(t, err)
|
||||
|
||||
fieldParam := constFieldParam{
|
||||
id: 100,
|
||||
dataType: schemapb.DataType_Int64,
|
||||
|
@ -540,8 +503,7 @@ func TestSegment_segmentLoadDeletedRecord(t *testing.T) {
|
|||
defaultPartitionID,
|
||||
defaultCollectionID,
|
||||
defaultDMLChannel,
|
||||
segmentTypeSealed,
|
||||
pool)
|
||||
segmentTypeSealed)
|
||||
assert.Nil(t, err)
|
||||
ids := []int64{1, 2, 3}
|
||||
pks := make([]primaryKey, 0)
|
||||
|
@ -610,9 +572,6 @@ func TestSegment_indexInfo(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestSegment_BasicMetrics(t *testing.T) {
|
||||
pool, err := concurrency.NewPool(runtime.GOMAXPROCS(0))
|
||||
require.NoError(t, err)
|
||||
|
||||
schema := genTestCollectionSchema()
|
||||
collection := newCollection(defaultCollectionID, schema)
|
||||
segment, err := newSegment(collection,
|
||||
|
@ -620,8 +579,7 @@ func TestSegment_BasicMetrics(t *testing.T) {
|
|||
defaultPartitionID,
|
||||
defaultCollectionID,
|
||||
defaultDMLChannel,
|
||||
segmentTypeSealed,
|
||||
pool)
|
||||
segmentTypeSealed)
|
||||
assert.Nil(t, err)
|
||||
|
||||
t.Run("test id binlog row size", func(t *testing.T) {
|
||||
|
@ -660,8 +618,6 @@ func TestSegment_BasicMetrics(t *testing.T) {
|
|||
func TestSegment_fillIndexedFieldsData(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
pool, err := concurrency.NewPool(runtime.GOMAXPROCS(0))
|
||||
require.NoError(t, err)
|
||||
|
||||
schema := genTestCollectionSchema()
|
||||
collection := newCollection(defaultCollectionID, schema)
|
||||
|
@ -670,8 +626,7 @@ func TestSegment_fillIndexedFieldsData(t *testing.T) {
|
|||
defaultPartitionID,
|
||||
defaultCollectionID,
|
||||
defaultDMLChannel,
|
||||
segmentTypeSealed,
|
||||
pool)
|
||||
segmentTypeSealed)
|
||||
assert.Nil(t, err)
|
||||
|
||||
vecCM, err := genVectorChunkManager(ctx, collection)
|
||||
|
|
Loading…
Reference in New Issue