mirror of https://github.com/milvus-io/milvus.git
Use definitional type instead of raw type (#7797)
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/7860/head
parent
27beb033de
commit
691e4c5bc9
|
@ -54,7 +54,7 @@ type ReplicaInterface interface {
|
|||
hasCollection(collectionID UniqueID) bool
|
||||
getCollectionNum() int
|
||||
getPartitionIDs(collectionID UniqueID) ([]UniqueID, error)
|
||||
getVecFieldIDsByCollectionID(collectionID UniqueID) ([]int64, error)
|
||||
getVecFieldIDsByCollectionID(collectionID UniqueID) ([]FieldID, error)
|
||||
|
||||
// partition
|
||||
addPartition(collectionID UniqueID, partitionID UniqueID) error
|
||||
|
@ -93,8 +93,6 @@ type collectionReplica struct {
|
|||
partitions map[UniqueID]*Partition
|
||||
segments map[UniqueID]*Segment
|
||||
|
||||
loadType
|
||||
|
||||
excludedSegments map[UniqueID][]*datapb.SegmentInfo // map[collectionID]segmentIDs
|
||||
|
||||
etcdKV *etcdkv.EtcdKV
|
||||
|
@ -203,7 +201,7 @@ func (colReplica *collectionReplica) getPartitionIDs(collectionID UniqueID) ([]U
|
|||
return collection.partitionIDs, nil
|
||||
}
|
||||
|
||||
func (colReplica *collectionReplica) getVecFieldIDsByCollectionID(collectionID UniqueID) ([]int64, error) {
|
||||
func (colReplica *collectionReplica) getVecFieldIDsByCollectionID(collectionID UniqueID) ([]FieldID, error) {
|
||||
colReplica.mu.RLock()
|
||||
defer colReplica.mu.RUnlock()
|
||||
|
||||
|
@ -212,7 +210,7 @@ func (colReplica *collectionReplica) getVecFieldIDsByCollectionID(collectionID U
|
|||
return nil, err
|
||||
}
|
||||
|
||||
vecFields := make([]int64, 0)
|
||||
vecFields := make([]FieldID, 0)
|
||||
for _, field := range fields {
|
||||
if field.DataType == schemapb.DataType_BinaryVector || field.DataType == schemapb.DataType_FloatVector {
|
||||
vecFields = append(vecFields, field.FieldID)
|
||||
|
|
|
@ -255,38 +255,6 @@ func TestCollectionReplica_freeAll(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
//func TestReplaceGrowingSegmentBySealedSegment(t *testing.T) {
|
||||
// node := newQueryNodeMock()
|
||||
// collectionID := UniqueID(0)
|
||||
// segmentID := UniqueID(520)
|
||||
// initTestMeta(t, node, collectionID, segmentID)
|
||||
//
|
||||
// _, _, segIDs := node.historical.replica.getSegmentsBySegmentType(segmentTypeGrowing)
|
||||
// assert.Equal(t, len(segIDs), 1)
|
||||
//
|
||||
// collection, err := node.historical.replica.getCollectionByID(collectionID)
|
||||
// assert.NoError(t, err)
|
||||
// ns := newSegment(collection, segmentID, defaultPartitionID, collectionID, "", segmentTypeSealed, true)
|
||||
// err = node.historical.replica.replaceGrowingSegmentBySealedSegment(ns)
|
||||
// assert.NoError(t, err)
|
||||
//
|
||||
// segmentNums := node.historical.replica.getSegmentNum()
|
||||
// assert.Equal(t, segmentNums, 1)
|
||||
//
|
||||
// segment, err := node.historical.replica.getSegmentByID(segmentID)
|
||||
// assert.NoError(t, err)
|
||||
//
|
||||
// assert.Equal(t, segment.getType(), segmentTypeSealed)
|
||||
//
|
||||
// _, _, segIDs = node.historical.replica.getSegmentsBySegmentType(segmentTypeGrowing)
|
||||
// assert.Equal(t, len(segIDs), 0)
|
||||
// _, _, segIDs = node.historical.replica.getSegmentsBySegmentType(segmentTypeSealed)
|
||||
// assert.Equal(t, len(segIDs), 1)
|
||||
//
|
||||
// err = node.Stop()
|
||||
// assert.NoError(t, err)
|
||||
//}
|
||||
|
||||
func TestCollectionReplica_statistic(t *testing.T) {
|
||||
t.Run("test getCollectionIDs", func(t *testing.T) {
|
||||
replica, err := genSimpleReplica()
|
||||
|
|
|
@ -65,14 +65,14 @@ func TestDataSyncService_Start(t *testing.T) {
|
|||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_Insert,
|
||||
MsgID: 0,
|
||||
Timestamp: uint64(i + 1000),
|
||||
Timestamp: Timestamp(i + 1000),
|
||||
SourceID: 0,
|
||||
},
|
||||
CollectionID: collectionID,
|
||||
PartitionID: defaultPartitionID,
|
||||
SegmentID: int64(0),
|
||||
SegmentID: UniqueID(0),
|
||||
ChannelID: "0",
|
||||
Timestamps: []uint64{uint64(i + 1000), uint64(i + 1000)},
|
||||
Timestamps: []Timestamp{Timestamp(i + 1000), Timestamp(i + 1000)},
|
||||
RowIDs: []int64{int64(i), int64(i)},
|
||||
RowData: []*commonpb.Blob{
|
||||
{Value: rawData},
|
||||
|
|
|
@ -12,7 +12,6 @@
|
|||
package querynode
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/opentracing/opentracing-go"
|
||||
|
@ -30,8 +29,7 @@ type insertNode struct {
|
|||
}
|
||||
|
||||
type InsertData struct {
|
||||
insertContext map[int64]context.Context
|
||||
insertIDs map[UniqueID][]UniqueID
|
||||
insertIDs map[UniqueID][]int64
|
||||
insertTimestamps map[UniqueID][]Timestamp
|
||||
insertRecords map[UniqueID][]*commonpb.Blob
|
||||
insertOffset map[UniqueID]int64
|
||||
|
@ -56,10 +54,10 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
}
|
||||
|
||||
insertData := InsertData{
|
||||
insertIDs: make(map[int64][]int64),
|
||||
insertTimestamps: make(map[int64][]uint64),
|
||||
insertRecords: make(map[int64][]*commonpb.Blob),
|
||||
insertOffset: make(map[int64]int64),
|
||||
insertIDs: make(map[UniqueID][]int64),
|
||||
insertTimestamps: make(map[UniqueID][]Timestamp),
|
||||
insertRecords: make(map[UniqueID][]*commonpb.Blob),
|
||||
insertOffset: make(map[UniqueID]int64),
|
||||
}
|
||||
|
||||
if iMsg == nil {
|
||||
|
@ -134,7 +132,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
return []Msg{res}
|
||||
}
|
||||
|
||||
func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *sync.WaitGroup) {
|
||||
func (iNode *insertNode) insert(insertData *InsertData, segmentID UniqueID, wg *sync.WaitGroup) {
|
||||
log.Debug("QueryNode::iNode::insert", zap.Any("SegmentID", segmentID))
|
||||
var targetSegment, err = iNode.replica.getSegmentByID(segmentID)
|
||||
if err != nil {
|
||||
|
|
|
@ -46,7 +46,7 @@ type indexLoader struct {
|
|||
kv kv.BaseKV // minio kv
|
||||
}
|
||||
|
||||
func (loader *indexLoader) loadIndex(segment *Segment, fieldID int64) error {
|
||||
func (loader *indexLoader) loadIndex(segment *Segment, fieldID FieldID) error {
|
||||
// 1. use msg's index paths to get index bytes
|
||||
var err error
|
||||
var indexBuffer [][]byte
|
||||
|
|
|
@ -67,7 +67,7 @@ func (li *LoadIndexInfo) appendIndexParam(indexKey string, indexValue string) er
|
|||
return nil
|
||||
}
|
||||
|
||||
func (li *LoadIndexInfo) appendFieldInfo(fieldID int64) error {
|
||||
func (li *LoadIndexInfo) appendFieldInfo(fieldID FieldID) error {
|
||||
cFieldID := C.long(fieldID)
|
||||
status := C.AppendFieldInfo(li.cLoadIndexInfo, cFieldID)
|
||||
errorCode := status.error_code
|
||||
|
|
|
@ -114,7 +114,7 @@ func (pg *searchRequest) delete() {
|
|||
|
||||
type RetrievePlan struct {
|
||||
cRetrievePlan C.CRetrievePlan
|
||||
Timestamp uint64
|
||||
Timestamp Timestamp
|
||||
}
|
||||
|
||||
// func createRetrievePlan(col *Collection, msg *segcorepb.RetrieveRequest, timestamp uint64) (*RetrievePlan, error) {
|
||||
|
@ -132,7 +132,7 @@ type RetrievePlan struct {
|
|||
// return plan, nil
|
||||
// }
|
||||
|
||||
func createRetrievePlanByExpr(col *Collection, expr []byte, timestamp uint64) (*RetrievePlan, error) {
|
||||
func createRetrievePlanByExpr(col *Collection, expr []byte, timestamp Timestamp) (*RetrievePlan, error) {
|
||||
var cPlan C.CRetrievePlan
|
||||
status := C.CreateRetrievePlanByExpr(col.collectionPtr, (*C.char)(unsafe.Pointer(&expr[0])),
|
||||
(C.int64_t)(len(expr)), &cPlan)
|
||||
|
|
|
@ -85,7 +85,7 @@ type Segment struct {
|
|||
segmentType segmentType
|
||||
|
||||
paramMutex sync.RWMutex // guards index
|
||||
indexInfos map[int64]*indexInfo
|
||||
indexInfos map[FieldID]*indexInfo
|
||||
|
||||
idBinlogRowSizes []int64
|
||||
|
||||
|
@ -167,7 +167,7 @@ func (s *Segment) getVectorFieldInfo(fieldID UniqueID) (*VectorFieldInfo, error)
|
|||
return nil, errors.New("Invalid fieldID " + strconv.Itoa(int(fieldID)))
|
||||
}
|
||||
|
||||
func newSegment(collection *Collection, segmentID int64, partitionID UniqueID, collectionID UniqueID, vChannelID Channel, segType segmentType, onService bool) *Segment {
|
||||
func newSegment(collection *Collection, segmentID UniqueID, partitionID UniqueID, collectionID UniqueID, vChannelID Channel, segType segmentType, onService bool) *Segment {
|
||||
/*
|
||||
CSegmentInterface
|
||||
NewSegment(CCollection collection, uint64_t segment_id, SegmentType seg_type);
|
||||
|
|
|
@ -164,7 +164,7 @@ func (loader *segmentLoader) loadSegmentInternal(collectionID UniqueID, segment
|
|||
}
|
||||
}
|
||||
|
||||
indexedFieldIDs := make([]int64, 0)
|
||||
indexedFieldIDs := make([]FieldID, 0)
|
||||
for _, vecFieldID := range vectorFieldIDs {
|
||||
err = loader.indexLoader.setIndexInfo(collectionID, segment, vecFieldID)
|
||||
if err != nil {
|
||||
|
|
|
@ -93,7 +93,7 @@ func TestSegment_getRowCount(t *testing.T) {
|
|||
assert.Equal(t, segmentID, segment.segmentID)
|
||||
|
||||
ids := []int64{1, 2, 3}
|
||||
timestamps := []uint64{0, 0, 0}
|
||||
timestamps := []Timestamp{0, 0, 0}
|
||||
|
||||
const DIM = 16
|
||||
const N = 3
|
||||
|
@ -207,7 +207,7 @@ func TestSegment_retrieve(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
OutputFieldIds: []int64{101},
|
||||
OutputFieldIds: []FieldID{101},
|
||||
}
|
||||
// reqIds := &segcorepb.RetrieveRequest{
|
||||
// Ids: &schemapb.IDs{
|
||||
|
|
Loading…
Reference in New Issue