Format go code of reader

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/4973/head^2
bigsheeper 2020-11-03 20:16:56 +08:00 committed by yefu.chen
parent d6fe379143
commit 5cc4f7d6be
23 changed files with 173 additions and 134 deletions

View File

@ -46,7 +46,7 @@ func (c *Collection) DeletePartition(node *QueryNode, partition *Partition) {
for _, p := range c.Partitions { for _, p := range c.Partitions {
if p.PartitionName == partition.PartitionName { if p.PartitionName == partition.PartitionName {
for _, s := range p.Segments { for _, s := range p.Segments {
delete(node.SegmentsMap, s.SegmentId) delete(node.SegmentsMap, s.SegmentID)
} }
} else { } else {
tmpPartitions = append(tmpPartitions, p) tmpPartitions = append(tmpPartitions, p)

View File

@ -1,8 +1,9 @@
package reader package reader
import ( import (
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"log" "log"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
) )
type filteredDmNode struct { type filteredDmNode struct {

View File

@ -79,7 +79,7 @@ func (iNode *insertNode) getSegmentBySegmentID(segmentID int64) (*Segment, error
func (iNode *insertNode) insert(segmentID int64, wg *sync.WaitGroup) { func (iNode *insertNode) insert(segmentID int64, wg *sync.WaitGroup) {
var targetSegment, err = iNode.getSegmentBySegmentID(segmentID) var targetSegment, err = iNode.getSegmentBySegmentID(segmentID)
if err != nil { if err != nil {
log.Println("insert failed") log.Println("cannot find segment:", segmentID)
// TODO: add error handling // TODO: add error handling
return return
} }
@ -90,8 +90,13 @@ func (iNode *insertNode) insert(segmentID int64, wg *sync.WaitGroup) {
offsets := iNode.insertMsg.insertData.insertOffset[segmentID] offsets := iNode.insertMsg.insertData.insertOffset[segmentID]
err = targetSegment.SegmentInsert(offsets, &ids, &timestamps, &records) err = targetSegment.SegmentInsert(offsets, &ids, &timestamps, &records)
fmt.Println("Do insert done, len = ", len(iNode.insertMsg.insertData.insertIDs[segmentID])) if err != nil {
log.Println("insert failed")
// TODO: add error handling
return
}
fmt.Println("Do insert done, len = ", len(iNode.insertMsg.insertData.insertIDs[segmentID]))
wg.Done() wg.Done()
} }

View File

@ -3,11 +3,12 @@ package reader
import ( import (
"context" "context"
"fmt" "fmt"
"log"
"sync"
"github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/msgstream"
msgPb "github.com/zilliztech/milvus-distributed/internal/proto/message" msgPb "github.com/zilliztech/milvus-distributed/internal/proto/message"
"github.com/zilliztech/milvus-distributed/internal/util/flowgraph" "github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
"log"
"sync"
) )
type manipulationService struct { type manipulationService struct {
@ -148,7 +149,7 @@ func (node *QueryNode) MessagesPreprocess(insertDeleteMessages []*msgPb.InsertOr
} }
node.insertData.insertIDs[msg.SegmentId] = append(node.insertData.insertIDs[msg.SegmentId], msg.Uid) node.insertData.insertIDs[msg.SegmentId] = append(node.insertData.insertIDs[msg.SegmentId], msg.Uid)
node.insertData.insertTimestamps[msg.SegmentId] = append(node.insertData.insertTimestamps[msg.SegmentId], msg.Timestamp) node.insertData.insertTimestamps[msg.SegmentId] = append(node.insertData.insertTimestamps[msg.SegmentId], msg.Timestamp)
// node.insertData.insertRecords[msg.SegmentId] = append(node.insertData.insertRecords[msg.SegmentId], msg.RowsData.Blob) // node.insertData.insertRecords[msg.SegmentID] = append(node.insertData.insertRecords[msg.SegmentID], msg.RowsData.Blob)
} else if msg.Op == msgPb.OpType_DELETE { } else if msg.Op == msgPb.OpType_DELETE {
var r = DeleteRecord{ var r = DeleteRecord{
entityID: msg.Uid, entityID: msg.Uid,
@ -202,7 +203,7 @@ func (node *QueryNode) MessagesPreprocess(insertDeleteMessages []*msgPb.InsertOr
} }
node.insertData.insertIDs[msg.SegmentId] = append(node.insertData.insertIDs[msg.SegmentId], msg.Uid) node.insertData.insertIDs[msg.SegmentId] = append(node.insertData.insertIDs[msg.SegmentId], msg.Uid)
node.insertData.insertTimestamps[msg.SegmentId] = append(node.insertData.insertTimestamps[msg.SegmentId], msg.Timestamp) node.insertData.insertTimestamps[msg.SegmentId] = append(node.insertData.insertTimestamps[msg.SegmentId], msg.Timestamp)
// node.insertData.insertRecords[msg.SegmentId] = append(node.insertData.insertRecords[msg.SegmentId], msg.RowsData.Blob) // node.insertData.insertRecords[msg.SegmentID] = append(node.insertData.insertRecords[msg.SegmentID], msg.RowsData.Blob)
} else if msg.Op == msgPb.OpType_DELETE { } else if msg.Op == msgPb.OpType_DELETE {
var r = DeleteRecord{ var r = DeleteRecord{
entityID: msg.Uid, entityID: msg.Uid,

View File

@ -93,7 +93,8 @@ func TestInsertAndDelete_WriterDelete(t *testing.T) {
conf.LoadConfig("config.yaml") conf.LoadConfig("config.yaml")
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, _ := context.WithDeadline(context.Background(), d) ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
mc := msgclient.ReaderMessageClient{} mc := msgclient.ReaderMessageClient{}
pulsarAddr := "pulsar://" pulsarAddr := "pulsar://"
@ -195,7 +196,8 @@ func TestInsertAndDelete_PreInsertAndDelete(t *testing.T) {
conf.LoadConfig("config.yaml") conf.LoadConfig("config.yaml")
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, _ := context.WithDeadline(context.Background(), d) ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
mc := msgclient.ReaderMessageClient{} mc := msgclient.ReaderMessageClient{}
pulsarAddr := "pulsar://" pulsarAddr := "pulsar://"
@ -327,7 +329,8 @@ func TestInsertAndDelete_DoInsert(t *testing.T) {
conf.LoadConfig("config.yaml") conf.LoadConfig("config.yaml")
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, _ := context.WithDeadline(context.Background(), d) ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
mc := msgclient.ReaderMessageClient{} mc := msgclient.ReaderMessageClient{}
node := CreateQueryNode(ctx, 0, 0, &mc) node := CreateQueryNode(ctx, 0, 0, &mc)
@ -417,7 +420,8 @@ func TestInsertAndDelete_DoDelete(t *testing.T) {
conf.LoadConfig("config.yaml") conf.LoadConfig("config.yaml")
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, _ := context.WithDeadline(context.Background(), d) ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
mc := msgclient.ReaderMessageClient{} mc := msgclient.ReaderMessageClient{}
pulsarAddr := "pulsar://" pulsarAddr := "pulsar://"
@ -536,7 +540,8 @@ func TestInsertAndDelete_DoInsertAndDelete(t *testing.T) {
conf.LoadConfig("config.yaml") conf.LoadConfig("config.yaml")
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, _ := context.WithDeadline(context.Background(), d) ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
mc := msgclient.ReaderMessageClient{} mc := msgclient.ReaderMessageClient{}
pulsarAddr := "pulsar://" pulsarAddr := "pulsar://"

View File

@ -23,12 +23,12 @@ const (
SegmentPrefix = "/segment/" SegmentPrefix = "/segment/"
) )
func GetCollectionObjId(key string) string { func GetCollectionObjID(key string) string {
prefix := path.Join(conf.Config.Etcd.Rootpath, CollectionPrefix) + "/" prefix := path.Join(conf.Config.Etcd.Rootpath, CollectionPrefix) + "/"
return strings.TrimPrefix(key, prefix) return strings.TrimPrefix(key, prefix)
} }
func GetSegmentObjId(key string) string { func GetSegmentObjID(key string) string {
prefix := path.Join(conf.Config.Etcd.Rootpath, SegmentPrefix) + "/" prefix := path.Join(conf.Config.Etcd.Rootpath, SegmentPrefix) + "/"
return strings.TrimPrefix(key, prefix) return strings.TrimPrefix(key, prefix)
} }
@ -133,10 +133,10 @@ func (node *QueryNode) processSegmentCreate(id string, value string) {
func (node *QueryNode) processCreate(key string, msg string) { func (node *QueryNode) processCreate(key string, msg string) {
println("process create", key) println("process create", key)
if isCollectionObj(key) { if isCollectionObj(key) {
objID := GetCollectionObjId(key) objID := GetCollectionObjID(key)
node.processCollectionCreate(objID, msg) node.processCollectionCreate(objID, msg)
} else if isSegmentObj(key) { } else if isSegmentObj(key) {
objID := GetSegmentObjId(key) objID := GetSegmentObjID(key)
node.processSegmentCreate(objID, msg) node.processSegmentCreate(objID, msg)
} else { } else {
println("can not process create msg:", key) println("can not process create msg:", key)
@ -170,10 +170,10 @@ func (node *QueryNode) processCollectionModify(id string, value string) {
func (node *QueryNode) processModify(key string, msg string) { func (node *QueryNode) processModify(key string, msg string) {
// println("process modify") // println("process modify")
if isCollectionObj(key) { if isCollectionObj(key) {
objID := GetCollectionObjId(key) objID := GetCollectionObjID(key)
node.processCollectionModify(objID, msg) node.processCollectionModify(objID, msg)
} else if isSegmentObj(key) { } else if isSegmentObj(key) {
objID := GetSegmentObjId(key) objID := GetSegmentObjID(key)
node.processSegmentModify(objID, msg) node.processSegmentModify(objID, msg)
} else { } else {
println("can not process modify msg:", key) println("can not process modify msg:", key)
@ -183,7 +183,7 @@ func (node *QueryNode) processModify(key string, msg string) {
func (node *QueryNode) processSegmentDelete(id string) { func (node *QueryNode) processSegmentDelete(id string) {
println("Delete segment: ", id) println("Delete segment: ", id)
segmentId, err := strconv.ParseInt(id, 10, 64) segmentID, err := strconv.ParseInt(id, 10, 64)
if err != nil { if err != nil {
log.Println("Cannot parse segment id:" + id) log.Println("Cannot parse segment id:" + id)
} }
@ -191,7 +191,7 @@ func (node *QueryNode) processSegmentDelete(id string) {
for _, col := range node.Collections { for _, col := range node.Collections {
for _, p := range col.Partitions { for _, p := range col.Partitions {
for _, s := range p.Segments { for _, s := range p.Segments {
if s.SegmentId == segmentId { if s.SegmentID == segmentID {
p.DeleteSegment(node, s) p.DeleteSegment(node, s)
} }
} }
@ -202,22 +202,22 @@ func (node *QueryNode) processSegmentDelete(id string) {
func (node *QueryNode) processCollectionDelete(id string) { func (node *QueryNode) processCollectionDelete(id string) {
println("Delete collection: ", id) println("Delete collection: ", id)
collectionId, err := strconv.ParseInt(id, 10, 64) collectionID, err := strconv.ParseInt(id, 10, 64)
if err != nil { if err != nil {
log.Println("Cannot parse collection id:" + id) log.Println("Cannot parse collection id:" + id)
} }
targetCollection := node.GetCollectionByID(collectionId) targetCollection := node.GetCollectionByID(collectionID)
node.DeleteCollection(targetCollection) node.DeleteCollection(targetCollection)
} }
func (node *QueryNode) processDelete(key string) { func (node *QueryNode) processDelete(key string) {
println("process delete") println("process delete")
if isCollectionObj(key) { if isCollectionObj(key) {
objID := GetCollectionObjId(key) objID := GetCollectionObjID(key)
node.processCollectionDelete(objID) node.processCollectionDelete(objID)
} else if isSegmentObj(key) { } else if isSegmentObj(key) {
objID := GetSegmentObjId(key) objID := GetSegmentObjID(key)
node.processSegmentDelete(objID) node.processSegmentDelete(objID)
} else { } else {
println("can not process delete msg:", key) println("can not process delete msg:", key)
@ -256,7 +256,7 @@ func (node *QueryNode) loadCollections() error {
return err return err
} }
for i := range keys { for i := range keys {
objID := GetCollectionObjId(keys[i]) objID := GetCollectionObjID(keys[i])
node.processCollectionCreate(objID, values[i]) node.processCollectionCreate(objID, values[i])
} }
return nil return nil
@ -267,7 +267,7 @@ func (node *QueryNode) loadSegments() error {
return err return err
} }
for i := range keys { for i := range keys {
objID := GetSegmentObjId(keys[i]) objID := GetSegmentObjID(keys[i])
node.processSegmentCreate(objID, values[i]) node.processSegmentCreate(objID, values[i])
} }
return nil return nil

View File

@ -20,28 +20,28 @@ func TestMeta_GetCollectionObjId(t *testing.T) {
conf.LoadConfig("config.yaml") conf.LoadConfig("config.yaml")
var key = "/collection/collection0" var key = "/collection/collection0"
var collectionObjId1 = GetCollectionObjId(key) var collectionObjID1 = GetCollectionObjID(key)
assert.Equal(t, collectionObjId1, "/collection/collection0") assert.Equal(t, collectionObjID1, "/collection/collection0")
key = "fakeKey" key = "fakeKey"
var collectionObjId2 = GetCollectionObjId(key) var collectionObjID2 = GetCollectionObjID(key)
assert.Equal(t, collectionObjId2, "fakeKey") assert.Equal(t, collectionObjID2, "fakeKey")
} }
func TestMeta_GetSegmentObjId(t *testing.T) { func TestMeta_GetSegmentObjId(t *testing.T) {
conf.LoadConfig("config.yaml") conf.LoadConfig("config.yaml")
var key = "/segment/segment0" var key = "/segment/segment0"
var segmentObjId1 = GetSegmentObjId(key) var segmentObjID1 = GetSegmentObjID(key)
assert.Equal(t, segmentObjId1, "/segment/segment0") assert.Equal(t, segmentObjID1, "/segment/segment0")
key = "fakeKey" key = "fakeKey"
var segmentObjId2 = GetSegmentObjId(key) var segmentObjID2 = GetSegmentObjID(key)
assert.Equal(t, segmentObjId2, "fakeKey") assert.Equal(t, segmentObjID2, "fakeKey")
} }
func TestMeta_isCollectionObj(t *testing.T) { func TestMeta_isCollectionObj(t *testing.T) {
@ -158,7 +158,8 @@ func TestMeta_ProcessCollectionCreate(t *testing.T) {
conf.LoadConfig("config.yaml") conf.LoadConfig("config.yaml")
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, _ := context.WithDeadline(context.Background(), d) ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
mc := msgclient.ReaderMessageClient{} mc := msgclient.ReaderMessageClient{}
node := CreateQueryNode(ctx, 0, 0, &mc) node := CreateQueryNode(ctx, 0, 0, &mc)
@ -185,7 +186,8 @@ func TestMeta_ProcessSegmentCreate(t *testing.T) {
conf.LoadConfig("config.yaml") conf.LoadConfig("config.yaml")
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, _ := context.WithDeadline(context.Background(), d) ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
mc := msgclient.ReaderMessageClient{} mc := msgclient.ReaderMessageClient{}
node := CreateQueryNode(ctx, 0, 0, &mc) node := CreateQueryNode(ctx, 0, 0, &mc)
@ -202,7 +204,7 @@ func TestMeta_ProcessSegmentCreate(t *testing.T) {
node.processSegmentCreate(id, value) node.processSegmentCreate(id, value)
s := node.SegmentsMap[int64(0)] s := node.SegmentsMap[int64(0)]
assert.Equal(t, s.SegmentId, int64(0)) assert.Equal(t, s.SegmentID, int64(0))
assert.Equal(t, s.SegmentCloseTime, uint64(70368744177663)) assert.Equal(t, s.SegmentCloseTime, uint64(70368744177663))
assert.Equal(t, s.SegmentStatus, 0) assert.Equal(t, s.SegmentStatus, 0)
} }
@ -211,7 +213,8 @@ func TestMeta_ProcessCreate(t *testing.T) {
conf.LoadConfig("config.yaml") conf.LoadConfig("config.yaml")
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, _ := context.WithDeadline(context.Background(), d) ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
mc := msgclient.ReaderMessageClient{} mc := msgclient.ReaderMessageClient{}
node := CreateQueryNode(ctx, 0, 0, &mc) node := CreateQueryNode(ctx, 0, 0, &mc)
@ -242,7 +245,7 @@ func TestMeta_ProcessCreate(t *testing.T) {
node.processCreate(key2, msg2) node.processCreate(key2, msg2)
s := node.SegmentsMap[int64(0)] s := node.SegmentsMap[int64(0)]
assert.Equal(t, s.SegmentId, int64(0)) assert.Equal(t, s.SegmentID, int64(0))
assert.Equal(t, s.SegmentCloseTime, uint64(70368744177663)) assert.Equal(t, s.SegmentCloseTime, uint64(70368744177663))
assert.Equal(t, s.SegmentStatus, 0) assert.Equal(t, s.SegmentStatus, 0)
} }
@ -251,7 +254,8 @@ func TestMeta_ProcessSegmentModify(t *testing.T) {
conf.LoadConfig("config.yaml") conf.LoadConfig("config.yaml")
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, _ := context.WithDeadline(context.Background(), d) ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
mc := msgclient.ReaderMessageClient{} mc := msgclient.ReaderMessageClient{}
node := CreateQueryNode(ctx, 0, 0, &mc) node := CreateQueryNode(ctx, 0, 0, &mc)
@ -268,7 +272,7 @@ func TestMeta_ProcessSegmentModify(t *testing.T) {
node.processSegmentCreate(id, value) node.processSegmentCreate(id, value)
var s = node.SegmentsMap[int64(0)] var s = node.SegmentsMap[int64(0)]
assert.Equal(t, s.SegmentId, int64(0)) assert.Equal(t, s.SegmentID, int64(0))
assert.Equal(t, s.SegmentCloseTime, uint64(70368744177663)) assert.Equal(t, s.SegmentCloseTime, uint64(70368744177663))
assert.Equal(t, s.SegmentStatus, 0) assert.Equal(t, s.SegmentStatus, 0)
@ -280,7 +284,7 @@ func TestMeta_ProcessSegmentModify(t *testing.T) {
node.processSegmentModify(id, newValue) node.processSegmentModify(id, newValue)
s = node.SegmentsMap[int64(0)] s = node.SegmentsMap[int64(0)]
assert.Equal(t, s.SegmentId, int64(0)) assert.Equal(t, s.SegmentID, int64(0))
assert.Equal(t, s.SegmentCloseTime, uint64(70368744177888)) assert.Equal(t, s.SegmentCloseTime, uint64(70368744177888))
assert.Equal(t, s.SegmentStatus, 0) assert.Equal(t, s.SegmentStatus, 0)
} }
@ -289,7 +293,8 @@ func TestMeta_ProcessCollectionModify(t *testing.T) {
conf.LoadConfig("config.yaml") conf.LoadConfig("config.yaml")
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, _ := context.WithDeadline(context.Background(), d) ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
mc := msgclient.ReaderMessageClient{} mc := msgclient.ReaderMessageClient{}
node := CreateQueryNode(ctx, 0, 0, &mc) node := CreateQueryNode(ctx, 0, 0, &mc)
@ -333,7 +338,8 @@ func TestMeta_ProcessModify(t *testing.T) {
conf.LoadConfig("config.yaml") conf.LoadConfig("config.yaml")
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, _ := context.WithDeadline(context.Background(), d) ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
mc := msgclient.ReaderMessageClient{} mc := msgclient.ReaderMessageClient{}
node := CreateQueryNode(ctx, 0, 0, &mc) node := CreateQueryNode(ctx, 0, 0, &mc)
@ -364,7 +370,7 @@ func TestMeta_ProcessModify(t *testing.T) {
node.processCreate(key2, msg2) node.processCreate(key2, msg2)
s := node.SegmentsMap[int64(0)] s := node.SegmentsMap[int64(0)]
assert.Equal(t, s.SegmentId, int64(0)) assert.Equal(t, s.SegmentID, int64(0))
assert.Equal(t, s.SegmentCloseTime, uint64(70368744177663)) assert.Equal(t, s.SegmentCloseTime, uint64(70368744177663))
assert.Equal(t, s.SegmentStatus, 0) assert.Equal(t, s.SegmentStatus, 0)
@ -394,7 +400,7 @@ func TestMeta_ProcessModify(t *testing.T) {
node.processModify(key2, msg4) node.processModify(key2, msg4)
s = node.SegmentsMap[int64(0)] s = node.SegmentsMap[int64(0)]
assert.Equal(t, s.SegmentId, int64(0)) assert.Equal(t, s.SegmentID, int64(0))
assert.Equal(t, s.SegmentCloseTime, uint64(70368744177888)) assert.Equal(t, s.SegmentCloseTime, uint64(70368744177888))
assert.Equal(t, s.SegmentStatus, 0) assert.Equal(t, s.SegmentStatus, 0)
} }
@ -403,7 +409,8 @@ func TestMeta_ProcessSegmentDelete(t *testing.T) {
conf.LoadConfig("config.yaml") conf.LoadConfig("config.yaml")
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, _ := context.WithDeadline(context.Background(), d) ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
mc := msgclient.ReaderMessageClient{} mc := msgclient.ReaderMessageClient{}
node := CreateQueryNode(ctx, 0, 0, &mc) node := CreateQueryNode(ctx, 0, 0, &mc)
@ -420,7 +427,7 @@ func TestMeta_ProcessSegmentDelete(t *testing.T) {
node.processSegmentCreate(id, value) node.processSegmentCreate(id, value)
s := node.SegmentsMap[int64(0)] s := node.SegmentsMap[int64(0)]
assert.Equal(t, s.SegmentId, int64(0)) assert.Equal(t, s.SegmentID, int64(0))
assert.Equal(t, s.SegmentCloseTime, uint64(70368744177663)) assert.Equal(t, s.SegmentCloseTime, uint64(70368744177663))
assert.Equal(t, s.SegmentStatus, 0) assert.Equal(t, s.SegmentStatus, 0)
@ -434,7 +441,8 @@ func TestMeta_ProcessCollectionDelete(t *testing.T) {
conf.LoadConfig("config.yaml") conf.LoadConfig("config.yaml")
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, _ := context.WithDeadline(context.Background(), d) ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
mc := msgclient.ReaderMessageClient{} mc := msgclient.ReaderMessageClient{}
node := CreateQueryNode(ctx, 0, 0, &mc) node := CreateQueryNode(ctx, 0, 0, &mc)
@ -466,7 +474,8 @@ func TestMeta_ProcessDelete(t *testing.T) {
conf.LoadConfig("config.yaml") conf.LoadConfig("config.yaml")
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, _ := context.WithDeadline(context.Background(), d) ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
mc := msgclient.ReaderMessageClient{} mc := msgclient.ReaderMessageClient{}
node := CreateQueryNode(ctx, 0, 0, &mc) node := CreateQueryNode(ctx, 0, 0, &mc)
@ -497,7 +506,7 @@ func TestMeta_ProcessDelete(t *testing.T) {
node.processCreate(key2, msg2) node.processCreate(key2, msg2)
s := node.SegmentsMap[int64(0)] s := node.SegmentsMap[int64(0)]
assert.Equal(t, s.SegmentId, int64(0)) assert.Equal(t, s.SegmentID, int64(0))
assert.Equal(t, s.SegmentCloseTime, uint64(70368744177663)) assert.Equal(t, s.SegmentCloseTime, uint64(70368744177663))
assert.Equal(t, s.SegmentStatus, 0) assert.Equal(t, s.SegmentStatus, 0)
@ -515,7 +524,8 @@ func TestMeta_ProcessResp(t *testing.T) {
conf.LoadConfig("config.yaml") conf.LoadConfig("config.yaml")
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, _ := context.WithDeadline(context.Background(), d) ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
mc := msgclient.ReaderMessageClient{} mc := msgclient.ReaderMessageClient{}
node := CreateQueryNode(ctx, 0, 0, &mc) node := CreateQueryNode(ctx, 0, 0, &mc)
@ -537,7 +547,8 @@ func TestMeta_LoadCollections(t *testing.T) {
conf.LoadConfig("config.yaml") conf.LoadConfig("config.yaml")
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, _ := context.WithDeadline(context.Background(), d) ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
mc := msgclient.ReaderMessageClient{} mc := msgclient.ReaderMessageClient{}
node := CreateQueryNode(ctx, 0, 0, &mc) node := CreateQueryNode(ctx, 0, 0, &mc)
@ -553,7 +564,8 @@ func TestMeta_LoadSegments(t *testing.T) {
conf.LoadConfig("config.yaml") conf.LoadConfig("config.yaml")
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, _ := context.WithDeadline(context.Background(), d) ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
mc := msgclient.ReaderMessageClient{} mc := msgclient.ReaderMessageClient{}
node := CreateQueryNode(ctx, 0, 0, &mc) node := CreateQueryNode(ctx, 0, 0, &mc)
@ -569,7 +581,8 @@ func TestMeta_InitFromMeta(t *testing.T) {
conf.LoadConfig("config.yaml") conf.LoadConfig("config.yaml")
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, _ := context.WithDeadline(context.Background(), d) ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
mc := msgclient.ReaderMessageClient{} mc := msgclient.ReaderMessageClient{}
node := CreateQueryNode(ctx, 0, 0, &mc) node := CreateQueryNode(ctx, 0, 0, &mc)
@ -582,7 +595,8 @@ func TestMeta_RunMetaService(t *testing.T) {
conf.LoadConfig("config.yaml") conf.LoadConfig("config.yaml")
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, _ := context.WithDeadline(context.Background(), d) ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
node := CreateQueryNode(ctx, 0, 0, nil) node := CreateQueryNode(ctx, 0, 0, nil)

View File

@ -1,8 +1,9 @@
package reader package reader
import ( import (
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"log" "log"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
) )
type msgStreamNode struct { type msgStreamNode struct {

View File

@ -19,14 +19,14 @@ type Partition struct {
Segments []*Segment Segments []*Segment
} }
func (p *Partition) NewSegment(segmentId int64) *Segment { func (p *Partition) NewSegment(segmentID int64) *Segment {
/* /*
CSegmentBase CSegmentBase
NewSegment(CPartition partition, unsigned long segment_id); NewSegment(CPartition partition, unsigned long segment_id);
*/ */
segmentPtr := C.NewSegment(p.PartitionPtr, C.ulong(segmentId)) segmentPtr := C.NewSegment(p.PartitionPtr, C.ulong(segmentID))
var newSegment = &Segment{SegmentPtr: segmentPtr, SegmentId: segmentId} var newSegment = &Segment{SegmentPtr: segmentPtr, SegmentID: segmentID}
p.Segments = append(p.Segments, newSegment) p.Segments = append(p.Segments, newSegment)
return newSegment return newSegment
} }
@ -42,8 +42,8 @@ func (p *Partition) DeleteSegment(node *QueryNode, segment *Segment) {
tmpSegments := make([]*Segment, 0) tmpSegments := make([]*Segment, 0)
for _, s := range p.Segments { for _, s := range p.Segments {
if s.SegmentId == segment.SegmentId { if s.SegmentID == segment.SegmentID {
delete(node.SegmentsMap, s.SegmentId) delete(node.SegmentsMap, s.SegmentID)
} else { } else {
tmpSegments = append(tmpSegments, s) tmpSegments = append(tmpSegments, s)
} }

View File

@ -20,13 +20,13 @@ func TestPartition_NewSegment(t *testing.T) {
assert.Equal(t, collection.CollectionName, "collection0") assert.Equal(t, collection.CollectionName, "collection0")
assert.Equal(t, collection.CollectionID, uint64(0)) assert.Equal(t, collection.CollectionID, uint64(0))
assert.Equal(t, partition.PartitionName, "partition0") assert.Equal(t, partition.PartitionName, "partition0")
assert.Equal(t, node.Collections[0].Partitions[0].Segments[0].SegmentId, int64(0)) assert.Equal(t, node.Collections[0].Partitions[0].Segments[0].SegmentID, int64(0))
assert.Equal(t, len(collection.Partitions), 1) assert.Equal(t, len(collection.Partitions), 1)
assert.Equal(t, len(node.Collections), 1) assert.Equal(t, len(node.Collections), 1)
assert.Equal(t, len(node.Collections[0].Partitions[0].Segments), 1) assert.Equal(t, len(node.Collections[0].Partitions[0].Segments), 1)
assert.Equal(t, segment.SegmentId, int64(0)) assert.Equal(t, segment.SegmentID, int64(0))
assert.Equal(t, node.FoundSegmentBySegmentID(int64(0)), true) assert.Equal(t, node.FoundSegmentBySegmentID(int64(0)), true)
} }
@ -44,13 +44,13 @@ func TestPartition_DeleteSegment(t *testing.T) {
assert.Equal(t, collection.CollectionName, "collection0") assert.Equal(t, collection.CollectionName, "collection0")
assert.Equal(t, collection.CollectionID, uint64(0)) assert.Equal(t, collection.CollectionID, uint64(0))
assert.Equal(t, partition.PartitionName, "partition0") assert.Equal(t, partition.PartitionName, "partition0")
assert.Equal(t, node.Collections[0].Partitions[0].Segments[0].SegmentId, int64(0)) assert.Equal(t, node.Collections[0].Partitions[0].Segments[0].SegmentID, int64(0))
assert.Equal(t, len(collection.Partitions), 1) assert.Equal(t, len(collection.Partitions), 1)
assert.Equal(t, len(node.Collections), 1) assert.Equal(t, len(node.Collections), 1)
assert.Equal(t, len(node.Collections[0].Partitions[0].Segments), 1) assert.Equal(t, len(node.Collections[0].Partitions[0].Segments), 1)
assert.Equal(t, segment.SegmentId, int64(0)) assert.Equal(t, segment.SegmentID, int64(0))
// 2. Destruct collection, partition and segment // 2. Destruct collection, partition and segment
partition.DeleteSegment(node, segment) partition.DeleteSegment(node, segment)

View File

@ -85,7 +85,7 @@ type QueryNode struct {
// context // context
ctx context.Context ctx context.Context
QueryNodeId uint64 QueryNodeID uint64
Collections []*Collection Collections []*Collection
SegmentsMap map[int64]*Segment SegmentsMap map[int64]*Segment
messageClient *msgclient.ReaderMessageClient messageClient *msgclient.ReaderMessageClient
@ -99,7 +99,7 @@ type QueryNode struct {
InsertLogs []InsertLog InsertLogs []InsertLog
} }
func NewQueryNode(ctx context.Context, queryNodeId uint64, timeSync uint64) *QueryNode { func NewQueryNode(ctx context.Context, queryNodeID uint64, timeSync uint64) *QueryNode {
mc := msgclient.ReaderMessageClient{} mc := msgclient.ReaderMessageClient{}
queryNodeTimeSync := &QueryNodeTime{ queryNodeTimeSync := &QueryNodeTime{
@ -127,7 +127,7 @@ func NewQueryNode(ctx context.Context, queryNodeId uint64, timeSync uint64) *Que
return &QueryNode{ return &QueryNode{
ctx: ctx, ctx: ctx,
QueryNodeId: queryNodeId, QueryNodeID: queryNodeID,
Collections: nil, Collections: nil,
SegmentsMap: segmentsMap, SegmentsMap: segmentsMap,
messageClient: &mc, messageClient: &mc,
@ -146,7 +146,7 @@ func (node *QueryNode) Close() {
} }
} }
func CreateQueryNode(ctx context.Context, queryNodeId uint64, timeSync uint64, mc *msgclient.ReaderMessageClient) *QueryNode { func CreateQueryNode(ctx context.Context, queryNodeID uint64, timeSync uint64, mc *msgclient.ReaderMessageClient) *QueryNode {
queryNodeTimeSync := &QueryNodeTime{ queryNodeTimeSync := &QueryNodeTime{
ReadTimeSyncMin: timeSync, ReadTimeSyncMin: timeSync,
ReadTimeSyncMax: timeSync, ReadTimeSyncMax: timeSync,
@ -175,7 +175,7 @@ func CreateQueryNode(ctx context.Context, queryNodeId uint64, timeSync uint64, m
return &QueryNode{ return &QueryNode{
ctx: ctx, ctx: ctx,
QueryNodeId: queryNodeId, QueryNodeID: queryNodeID,
Collections: nil, Collections: nil,
SegmentsMap: segmentsMap, SegmentsMap: segmentsMap,
messageClient: mc, messageClient: mc,
@ -202,7 +202,7 @@ func (node *QueryNode) QueryNodeDataInit() {
insertIDs: make(map[int64][]int64), insertIDs: make(map[int64][]int64),
insertTimestamps: make(map[int64][]uint64), insertTimestamps: make(map[int64][]uint64),
// insertRecords: make(map[int64][][]byte), // insertRecords: make(map[int64][][]byte),
insertOffset: make(map[int64]int64), insertOffset: make(map[int64]int64),
} }
node.deletePreprocessData = deletePreprocessData node.deletePreprocessData = deletePreprocessData
@ -235,7 +235,7 @@ func (node *QueryNode) DeleteCollection(collection *Collection) {
if col.CollectionID == collectionID { if col.CollectionID == collectionID {
for _, p := range collection.Partitions { for _, p := range collection.Partitions {
for _, s := range p.Segments { for _, s := range p.Segments {
delete(node.SegmentsMap, s.SegmentId) delete(node.SegmentsMap, s.SegmentID)
} }
} }
} else { } else {

View File

@ -10,7 +10,8 @@ import (
func TestQueryNode_CreateQueryNode(t *testing.T) { func TestQueryNode_CreateQueryNode(t *testing.T) {
conf.LoadConfig("config.yaml") conf.LoadConfig("config.yaml")
ctx, _ := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel()
node := CreateQueryNode(ctx, 0, 0, nil) node := CreateQueryNode(ctx, 0, 0, nil)
assert.NotNil(t, node) assert.NotNil(t, node)
@ -18,7 +19,8 @@ func TestQueryNode_CreateQueryNode(t *testing.T) {
func TestQueryNode_NewQueryNode(t *testing.T) { func TestQueryNode_NewQueryNode(t *testing.T) {
conf.LoadConfig("config.yaml") conf.LoadConfig("config.yaml")
ctx, _ := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel()
node := NewQueryNode(ctx, 0, 0) node := NewQueryNode(ctx, 0, 0)
assert.NotNil(t, node) assert.NotNil(t, node)
@ -26,7 +28,8 @@ func TestQueryNode_NewQueryNode(t *testing.T) {
func TestQueryNode_Close(t *testing.T) { func TestQueryNode_Close(t *testing.T) {
conf.LoadConfig("config.yaml") conf.LoadConfig("config.yaml")
ctx, _ := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel()
node := CreateQueryNode(ctx, 0, 0, nil) node := CreateQueryNode(ctx, 0, 0, nil)
assert.NotNil(t, node) assert.NotNil(t, node)
@ -36,7 +39,8 @@ func TestQueryNode_Close(t *testing.T) {
func TestQueryNode_QueryNodeDataInit(t *testing.T) { func TestQueryNode_QueryNodeDataInit(t *testing.T) {
conf.LoadConfig("config.yaml") conf.LoadConfig("config.yaml")
ctx, _ := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel()
node := CreateQueryNode(ctx, 0, 0, nil) node := CreateQueryNode(ctx, 0, 0, nil)
assert.NotNil(t, node) assert.NotNil(t, node)
@ -50,7 +54,8 @@ func TestQueryNode_QueryNodeDataInit(t *testing.T) {
func TestQueryNode_NewCollection(t *testing.T) { func TestQueryNode_NewCollection(t *testing.T) {
conf.LoadConfig("config.yaml") conf.LoadConfig("config.yaml")
ctx, _ := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel()
node := CreateQueryNode(ctx, 0, 0, nil) node := CreateQueryNode(ctx, 0, 0, nil)
assert.NotNil(t, node) assert.NotNil(t, node)
@ -63,7 +68,8 @@ func TestQueryNode_NewCollection(t *testing.T) {
func TestQueryNode_DeleteCollection(t *testing.T) { func TestQueryNode_DeleteCollection(t *testing.T) {
conf.LoadConfig("config.yaml") conf.LoadConfig("config.yaml")
ctx, _ := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel()
node := CreateQueryNode(ctx, 0, 0, nil) node := CreateQueryNode(ctx, 0, 0, nil)
assert.NotNil(t, node) assert.NotNil(t, node)

View File

@ -103,7 +103,6 @@ func (node *QueryNode) RunInsertDelete(wg *sync.WaitGroup) {
} }
} }
} }
wg.Done()
} }
func (node *QueryNode) RunSearch(wg *sync.WaitGroup) { func (node *QueryNode) RunSearch(wg *sync.WaitGroup) {
@ -129,5 +128,4 @@ func (node *QueryNode) RunSearch(wg *sync.WaitGroup) {
default: default:
} }
} }
wg.Done()
} }

View File

@ -19,7 +19,8 @@ func TestReader_startQueryNode(t *testing.T) {
conf.LoadConfig("config.yaml") conf.LoadConfig("config.yaml")
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, _ := context.WithDeadline(context.Background(), d) ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
pulsarAddr := "pulsar://" pulsarAddr := "pulsar://"
pulsarAddr += conf.Config.Pulsar.Address pulsarAddr += conf.Config.Pulsar.Address
@ -37,7 +38,8 @@ func TestReader_RunInsertDelete(t *testing.T) {
conf.LoadConfig("config.yaml") conf.LoadConfig("config.yaml")
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, _ := context.WithDeadline(context.Background(), d) ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
mc := msgclient.ReaderMessageClient{} mc := msgclient.ReaderMessageClient{}
pulsarAddr := "pulsar://" pulsarAddr := "pulsar://"
@ -67,7 +69,8 @@ func TestReader_RunSearch(t *testing.T) {
conf.LoadConfig("config.yaml") conf.LoadConfig("config.yaml")
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, _ := context.WithDeadline(context.Background(), d) ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
mc := msgclient.ReaderMessageClient{} mc := msgclient.ReaderMessageClient{}
pulsarAddr := "pulsar://" pulsarAddr := "pulsar://"

View File

@ -20,7 +20,8 @@ func TestResult_PublishSearchResult(t *testing.T) {
conf.LoadConfig("config.yaml") conf.LoadConfig("config.yaml")
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, _ := context.WithDeadline(context.Background(), d) ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
mc := msgclient.ReaderMessageClient{} mc := msgclient.ReaderMessageClient{}
pulsarAddr := "pulsar://" pulsarAddr := "pulsar://"
@ -61,7 +62,8 @@ func TestResult_PublishFailedSearchResult(t *testing.T) {
conf.LoadConfig("config.yaml") conf.LoadConfig("config.yaml")
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, _ := context.WithDeadline(context.Background(), d) ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
mc := msgclient.ReaderMessageClient{} mc := msgclient.ReaderMessageClient{}
pulsarAddr := "pulsar://" pulsarAddr := "pulsar://"
@ -89,7 +91,8 @@ func TestResult_PublicStatistic(t *testing.T) {
conf.LoadConfig("config.yaml") conf.LoadConfig("config.yaml")
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, _ := context.WithDeadline(context.Background(), d) ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
mc := msgclient.ReaderMessageClient{} mc := msgclient.ReaderMessageClient{}
pulsarAddr := "pulsar://" pulsarAddr := "pulsar://"

View File

@ -11,7 +11,7 @@ import (
func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status { func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status {
type SearchResultTmp struct { type SearchResultTmp struct {
ResultId int64 ResultID int64
ResultDistance float32 ResultDistance float32
} }
@ -20,7 +20,7 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status {
// Traverse all messages in the current messageClient. // Traverse all messages in the current messageClient.
// TODO: Do not receive batched search requests // TODO: Do not receive batched search requests
for _, msg := range searchMessages { for _, msg := range searchMessages {
var clientId = msg.ClientId var clientID = msg.ClientId
var searchTimestamp = msg.Timestamp var searchTimestamp = msg.Timestamp
// ServiceTimeSync update by TimeSync, which is get from proxy. // ServiceTimeSync update by TimeSync, which is get from proxy.
@ -34,7 +34,7 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status {
var vector = msg.Records var vector = msg.Records
// We now only the first Json is valid. // We now only the first Json is valid.
var queryJson = msg.Json[0] var queryJSON = msg.Json[0]
// 1. Timestamp check // 1. Timestamp check
// TODO: return or wait? Or adding graceful time // TODO: return or wait? Or adding graceful time
@ -44,7 +44,7 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status {
} }
// 2. Get query information from query json // 2. Get query information from query json
query := node.QueryJson2Info(&queryJson) query := node.QueryJSON2Info(&queryJSON)
// 2d slice for receiving multiple queries's results // 2d slice for receiving multiple queries's results
var resultsTmp = make([][]SearchResultTmp, query.NumQueries) var resultsTmp = make([][]SearchResultTmp, query.NumQueries)
for i := 0; i < int(query.NumQueries); i++ { for i := 0; i < int(query.NumQueries); i++ {
@ -58,7 +58,7 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status {
continue continue
} }
//fmt.Println("Search in segment:", segment.SegmentId, ",segment rows:", segment.GetRowCount()) //fmt.Println("Search in segment:", segment.SegmentID, ",segment rows:", segment.GetRowCount())
var res, err = segment.SegmentSearch(query, searchTimestamp, vector) var res, err = segment.SegmentSearch(query, searchTimestamp, vector)
if err != nil { if err != nil {
fmt.Println(err.Error()) fmt.Println(err.Error())
@ -68,7 +68,7 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status {
for i := 0; i < int(query.NumQueries); i++ { for i := 0; i < int(query.NumQueries); i++ {
for j := i * query.TopK; j < (i+1)*query.TopK; j++ { for j := i * query.TopK; j < (i+1)*query.TopK; j++ {
resultsTmp[i] = append(resultsTmp[i], SearchResultTmp{ resultsTmp[i] = append(resultsTmp[i], SearchResultTmp{
ResultId: res.ResultIds[j], ResultID: res.ResultIds[j],
ResultDistance: res.ResultDistances[j], ResultDistance: res.ResultDistances[j],
}) })
} }
@ -98,11 +98,11 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status {
Entities: &entities, Entities: &entities,
Distances: make([]float32, 0), Distances: make([]float32, 0),
QueryId: msg.Uid, QueryId: msg.Uid,
ProxyId: clientId, ProxyId: clientID,
} }
for _, rTmp := range resultsTmp { for _, rTmp := range resultsTmp {
for _, res := range rTmp { for _, res := range rTmp {
results.Entities.Ids = append(results.Entities.Ids, res.ResultId) results.Entities.Ids = append(results.Entities.Ids, res.ResultID)
results.Distances = append(results.Distances, res.ResultDistance) results.Distances = append(results.Distances, res.ResultDistance)
results.Scores = append(results.Distances, float32(0)) results.Scores = append(results.Distances, float32(0))
} }

View File

@ -2,6 +2,7 @@ package reader
import ( import (
"context" "context"
"github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/msgstream"
) )

View File

@ -18,7 +18,8 @@ import (
func TestSearch_Search(t *testing.T) { func TestSearch_Search(t *testing.T) {
conf.LoadConfig("config.yaml") conf.LoadConfig("config.yaml")
ctx, _ := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mc := msgclient.ReaderMessageClient{} mc := msgclient.ReaderMessageClient{}
@ -114,7 +115,7 @@ func TestSearch_Search(t *testing.T) {
queryRawData = append(queryRawData, float32(i)) queryRawData = append(queryRawData, float32(i))
} }
var queryJson = "{\"field_name\":\"fakevec\",\"num_queries\":1,\"topK\":10}" var queryJSON = "{\"field_name\":\"fakevec\",\"num_queries\":1,\"topK\":10}"
searchMsg1 := msgPb.SearchMsg{ searchMsg1 := msgPb.SearchMsg{
CollectionName: "collection0", CollectionName: "collection0",
Records: &msgPb.VectorRowRecord{ Records: &msgPb.VectorRowRecord{
@ -125,7 +126,7 @@ func TestSearch_Search(t *testing.T) {
Timestamp: uint64(0), Timestamp: uint64(0),
ClientId: int64(0), ClientId: int64(0),
ExtraParams: nil, ExtraParams: nil,
Json: []string{queryJson}, Json: []string{queryJSON},
} }
searchMessages := []*msgPb.SearchMsg{&searchMsg1} searchMessages := []*msgPb.SearchMsg{&searchMsg1}

View File

@ -30,7 +30,7 @@ const (
type Segment struct { type Segment struct {
SegmentPtr C.CSegmentBase SegmentPtr C.CSegmentBase
SegmentId int64 SegmentID int64
SegmentCloseTime uint64 SegmentCloseTime uint64
LastMemSize int64 LastMemSize int64
SegmentStatus int SegmentStatus int
@ -72,7 +72,7 @@ func (s *Segment) GetDeletedCount() int64 {
// int // int
// Close(CSegmentBase c_segment); // Close(CSegmentBase c_segment);
// */ // */
// fmt.Println("Closing segment :", s.SegmentId) // fmt.Println("Closing segment :", s.SegmentID)
// //
// var status = C.Close(s.SegmentPtr) // var status = C.Close(s.SegmentPtr)
// s.SegmentStatus = SegmentClosed // s.SegmentStatus = SegmentClosed
@ -226,9 +226,9 @@ func (s *Segment) SegmentSearch(query *QueryInfo, timestamp uint64, vectorRecord
var cQueryRawDataLength C.int var cQueryRawDataLength C.int
if vectorRecord.BinaryData != nil { if vectorRecord.BinaryData != nil {
return nil, errors.New("Data of binary type is not supported yet") return nil, errors.New("data of binary type is not supported yet")
} else if len(vectorRecord.FloatData) <= 0 { } else if len(vectorRecord.FloatData) <= 0 {
return nil, errors.New("Null query vector data") return nil, errors.New("null query vector data")
} else { } else {
cQueryRawData = (*C.float)(&vectorRecord.FloatData[0]) cQueryRawData = (*C.float)(&vectorRecord.FloatData[0])
cQueryRawDataLength = (C.int)(len(vectorRecord.FloatData)) cQueryRawDataLength = (C.int)(len(vectorRecord.FloatData))

View File

@ -44,7 +44,8 @@ import (
func TestSegmentManagement_SegmentStatistic(t *testing.T) { func TestSegmentManagement_SegmentStatistic(t *testing.T) {
conf.LoadConfig("config.yaml") conf.LoadConfig("config.yaml")
ctx, _ := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mc := msgclient.ReaderMessageClient{} mc := msgclient.ReaderMessageClient{}
pulsarAddr := "pulsar://" pulsarAddr := "pulsar://"
@ -73,7 +74,8 @@ func TestSegmentManagement_SegmentStatisticService(t *testing.T) {
conf.LoadConfig("config.yaml") conf.LoadConfig("config.yaml")
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, _ := context.WithDeadline(context.Background(), d) ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
mc := msgclient.ReaderMessageClient{} mc := msgclient.ReaderMessageClient{}
pulsarAddr := "pulsar://" pulsarAddr := "pulsar://"

View File

@ -23,7 +23,7 @@ func TestSegment_ConstructorAndDestructor(t *testing.T) {
assert.Equal(t, collection.CollectionName, "collection0") assert.Equal(t, collection.CollectionName, "collection0")
assert.Equal(t, partition.PartitionName, "partition0") assert.Equal(t, partition.PartitionName, "partition0")
assert.Equal(t, segment.SegmentId, int64(0)) assert.Equal(t, segment.SegmentID, int64(0))
assert.Equal(t, len(node.SegmentsMap), 1) assert.Equal(t, len(node.SegmentsMap), 1)
// 2. Destruct collection, partition and segment // 2. Destruct collection, partition and segment
@ -49,7 +49,7 @@ func TestSegment_SegmentInsert(t *testing.T) {
assert.Equal(t, collection.CollectionName, "collection0") assert.Equal(t, collection.CollectionName, "collection0")
assert.Equal(t, partition.PartitionName, "partition0") assert.Equal(t, partition.PartitionName, "partition0")
assert.Equal(t, segment.SegmentId, int64(0)) assert.Equal(t, segment.SegmentID, int64(0))
assert.Equal(t, len(node.SegmentsMap), 1) assert.Equal(t, len(node.SegmentsMap), 1)
// 2. Create ids and timestamps // 2. Create ids and timestamps
@ -107,7 +107,7 @@ func TestSegment_SegmentDelete(t *testing.T) {
assert.Equal(t, collection.CollectionName, "collection0") assert.Equal(t, collection.CollectionName, "collection0")
assert.Equal(t, partition.PartitionName, "partition0") assert.Equal(t, partition.PartitionName, "partition0")
assert.Equal(t, segment.SegmentId, int64(0)) assert.Equal(t, segment.SegmentID, int64(0))
assert.Equal(t, len(node.SegmentsMap), 1) assert.Equal(t, len(node.SegmentsMap), 1)
// 2. Create ids and timestamps // 2. Create ids and timestamps
@ -145,7 +145,7 @@ func TestSegment_SegmentSearch(t *testing.T) {
assert.Equal(t, collection.CollectionName, "collection0") assert.Equal(t, collection.CollectionName, "collection0")
assert.Equal(t, partition.PartitionName, "partition0") assert.Equal(t, partition.PartitionName, "partition0")
assert.Equal(t, segment.SegmentId, int64(0)) assert.Equal(t, segment.SegmentID, int64(0))
assert.Equal(t, len(node.SegmentsMap), 1) assert.Equal(t, len(node.SegmentsMap), 1)
// 2. Create ids and timestamps // 2. Create ids and timestamps
@ -183,7 +183,7 @@ func TestSegment_SegmentSearch(t *testing.T) {
//assert.NoError(t, err) //assert.NoError(t, err)
// 6. Do search // 6. Do search
var queryJson = "{\"field_name\":\"fakevec\",\"num_queries\":1,\"topK\":10}" var queryJSON = "{\"field_name\":\"fakevec\",\"num_queries\":1,\"topK\":10}"
var queryRawData = make([]float32, 0) var queryRawData = make([]float32, 0)
for i := 0; i < 16; i++ { for i := 0; i < 16; i++ {
queryRawData = append(queryRawData, float32(i)) queryRawData = append(queryRawData, float32(i))
@ -191,7 +191,7 @@ func TestSegment_SegmentSearch(t *testing.T) {
var vectorRecord = msgPb.VectorRowRecord{ var vectorRecord = msgPb.VectorRowRecord{
FloatData: queryRawData, FloatData: queryRawData,
} }
query := node.QueryJson2Info(&queryJson) query := node.QueryJSON2Info(&queryJSON)
var searchRes, searchErr = segment.SegmentSearch(query, timestamps[N/2], &vectorRecord) var searchRes, searchErr = segment.SegmentSearch(query, timestamps[N/2], &vectorRecord)
assert.NoError(t, searchErr) assert.NoError(t, searchErr)
fmt.Println(searchRes) fmt.Println(searchRes)
@ -219,7 +219,7 @@ func TestSegment_SegmentPreInsert(t *testing.T) {
assert.Equal(t, collection.CollectionName, "collection0") assert.Equal(t, collection.CollectionName, "collection0")
assert.Equal(t, partition.PartitionName, "partition0") assert.Equal(t, partition.PartitionName, "partition0")
assert.Equal(t, segment.SegmentId, int64(0)) assert.Equal(t, segment.SegmentID, int64(0))
assert.Equal(t, len(node.SegmentsMap), 1) assert.Equal(t, len(node.SegmentsMap), 1)
// 2. Do PreInsert // 2. Do PreInsert
@ -249,7 +249,7 @@ func TestSegment_SegmentPreDelete(t *testing.T) {
assert.Equal(t, collection.CollectionName, "collection0") assert.Equal(t, collection.CollectionName, "collection0")
assert.Equal(t, partition.PartitionName, "partition0") assert.Equal(t, partition.PartitionName, "partition0")
assert.Equal(t, segment.SegmentId, int64(0)) assert.Equal(t, segment.SegmentID, int64(0))
assert.Equal(t, len(node.SegmentsMap), 1) assert.Equal(t, len(node.SegmentsMap), 1)
// 2. Do PreDelete // 2. Do PreDelete
@ -321,7 +321,7 @@ func TestSegment_GetRowCount(t *testing.T) {
assert.Equal(t, collection.CollectionName, "collection0") assert.Equal(t, collection.CollectionName, "collection0")
assert.Equal(t, partition.PartitionName, "partition0") assert.Equal(t, partition.PartitionName, "partition0")
assert.Equal(t, segment.SegmentId, int64(0)) assert.Equal(t, segment.SegmentID, int64(0))
assert.Equal(t, len(node.SegmentsMap), 1) assert.Equal(t, len(node.SegmentsMap), 1)
// 2. Create ids and timestamps // 2. Create ids and timestamps
@ -383,7 +383,7 @@ func TestSegment_GetDeletedCount(t *testing.T) {
assert.Equal(t, collection.CollectionName, "collection0") assert.Equal(t, collection.CollectionName, "collection0")
assert.Equal(t, partition.PartitionName, "partition0") assert.Equal(t, partition.PartitionName, "partition0")
assert.Equal(t, segment.SegmentId, int64(0)) assert.Equal(t, segment.SegmentID, int64(0))
assert.Equal(t, len(node.SegmentsMap), 1) assert.Equal(t, len(node.SegmentsMap), 1)
// 2. Create ids and timestamps // 2. Create ids and timestamps
@ -426,7 +426,7 @@ func TestSegment_GetMemSize(t *testing.T) {
assert.Equal(t, collection.CollectionName, "collection0") assert.Equal(t, collection.CollectionName, "collection0")
assert.Equal(t, partition.PartitionName, "partition0") assert.Equal(t, partition.PartitionName, "partition0")
assert.Equal(t, segment.SegmentId, int64(0)) assert.Equal(t, segment.SegmentID, int64(0))
assert.Equal(t, len(node.SegmentsMap), 1) assert.Equal(t, len(node.SegmentsMap), 1)
// 2. Create ids and timestamps // 2. Create ids and timestamps
@ -496,7 +496,7 @@ func TestSegment_RealSchemaTest(t *testing.T) {
assert.Equal(t, collection.CollectionName, "collection0") assert.Equal(t, collection.CollectionName, "collection0")
assert.Equal(t, partition.PartitionName, "partition0") assert.Equal(t, partition.PartitionName, "partition0")
assert.Equal(t, segment.SegmentId, int64(0)) assert.Equal(t, segment.SegmentID, int64(0))
assert.Equal(t, len(node.SegmentsMap), 1) assert.Equal(t, len(node.SegmentsMap), 1)
// 2. Create ids and timestamps // 2. Create ids and timestamps

View File

@ -66,13 +66,9 @@ func (node *QueryNode) GetSegmentBySegmentID(segmentID int64) (*Segment, error)
} }
func (node *QueryNode) FoundSegmentBySegmentID(segmentID int64) bool { func (node *QueryNode) FoundSegmentBySegmentID(segmentID int64) bool {
targetSegment := node.SegmentsMap[segmentID] _, ok := node.SegmentsMap[segmentID]
if targetSegment == nil { return ok
return false
}
return true
} }
func (c *Collection) GetPartitionByName(partitionName string) (partition *Partition) { func (c *Collection) GetPartitionByName(partitionName string) (partition *Partition) {
@ -111,12 +107,12 @@ func (node *QueryNode) WriteQueryLog() {
// write logs // write logs
for _, insertLog := range node.InsertLogs { for _, insertLog := range node.InsertLogs {
insertLogJson, err := json.Marshal(&insertLog) insertLogJSON, err := json.Marshal(&insertLog)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
writeString := string(insertLogJson) + "\n" writeString := string(insertLogJSON) + "\n"
fmt.Println(writeString) fmt.Println(writeString)
_, err2 := f.WriteString(writeString) _, err2 := f.WriteString(writeString)
@ -141,9 +137,9 @@ func (node *QueryNode) PrepareBatchMsg() []int {
return msgLen return msgLen
} }
func (node *QueryNode) QueryJson2Info(queryJson *string) *QueryInfo { func (node *QueryNode) QueryJSON2Info(queryJSON *string) *QueryInfo {
var query QueryInfo var query QueryInfo
var err = json.Unmarshal([]byte(*queryJson), &query) var err = json.Unmarshal([]byte(*queryJSON), &query)
if err != nil { if err != nil {
log.Fatal("Unmarshal query json failed") log.Fatal("Unmarshal query json failed")

View File

@ -18,7 +18,8 @@ func TestUtilFunctions_GetKey2Segments(t *testing.T) {
conf.LoadConfig("config.yaml") conf.LoadConfig("config.yaml")
d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
ctx, _ := context.WithDeadline(context.Background(), d) ctx, cancel := context.WithDeadline(context.Background(), d)
defer cancel()
mc := msgclient.ReaderMessageClient{} mc := msgclient.ReaderMessageClient{}
pulsarAddr := "pulsar://" pulsarAddr := "pulsar://"
@ -65,7 +66,7 @@ func TestUtilFunctions_GetCollectionByID(t *testing.T) {
assert.Equal(t, collection.CollectionName, "collection0") assert.Equal(t, collection.CollectionName, "collection0")
assert.Equal(t, partition.PartitionName, "partition0") assert.Equal(t, partition.PartitionName, "partition0")
assert.Equal(t, segment.SegmentId, int64(0)) assert.Equal(t, segment.SegmentID, int64(0))
assert.Equal(t, len(node.SegmentsMap), 1) assert.Equal(t, len(node.SegmentsMap), 1)
c := node.GetCollectionByID(int64(0)) c := node.GetCollectionByID(int64(0))
@ -112,7 +113,7 @@ func TestUtilFunctions_GetSegmentBySegmentID(t *testing.T) {
// 2. Get segment by segment id // 2. Get segment by segment id
var s0, err = node.GetSegmentBySegmentID(0) var s0, err = node.GetSegmentBySegmentID(0)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, s0.SegmentId, int64(0)) assert.Equal(t, s0.SegmentID, int64(0))
node.Close() node.Close()
} }
@ -129,7 +130,7 @@ func TestUtilFunctions_FoundSegmentBySegmentID(t *testing.T) {
assert.Equal(t, collection.CollectionName, "collection0") assert.Equal(t, collection.CollectionName, "collection0")
assert.Equal(t, partition.PartitionName, "partition0") assert.Equal(t, partition.PartitionName, "partition0")
assert.Equal(t, segment.SegmentId, int64(0)) assert.Equal(t, segment.SegmentID, int64(0))
assert.Equal(t, len(node.SegmentsMap), 1) assert.Equal(t, len(node.SegmentsMap), 1)
b1 := node.FoundSegmentBySegmentID(int64(0)) b1 := node.FoundSegmentBySegmentID(int64(0))
@ -168,7 +169,8 @@ func TestUtilFunctions_GetPartitionByName(t *testing.T) {
func TestUtilFunctions_PrepareBatchMsg(t *testing.T) { func TestUtilFunctions_PrepareBatchMsg(t *testing.T) {
conf.LoadConfig("config.yaml") conf.LoadConfig("config.yaml")
ctx, _ := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mc := msgclient.ReaderMessageClient{} mc := msgclient.ReaderMessageClient{}
pulsarAddr := "pulsar://" pulsarAddr := "pulsar://"
@ -189,8 +191,8 @@ func TestUtilFunctions_QueryJson2Info(t *testing.T) {
ctx := context.Background() ctx := context.Background()
node := NewQueryNode(ctx, 0, 0) node := NewQueryNode(ctx, 0, 0)
var queryJson = "{\"field_name\":\"age\",\"num_queries\":1,\"topK\":10}" var queryJSON = "{\"field_name\":\"age\",\"num_queries\":1,\"topK\":10}"
info := node.QueryJson2Info(&queryJson) info := node.QueryJSON2Info(&queryJSON)
assert.Equal(t, info.FieldName, "age") assert.Equal(t, info.FieldName, "age")
assert.Equal(t, info.NumQueries, int64(1)) assert.Equal(t, info.NumQueries, int64(1))