diff --git a/internal/reader/collection.go b/internal/reader/collection.go index 317f010e2c..e87dc70b5e 100644 --- a/internal/reader/collection.go +++ b/internal/reader/collection.go @@ -46,7 +46,7 @@ func (c *Collection) DeletePartition(node *QueryNode, partition *Partition) { for _, p := range c.Partitions { if p.PartitionName == partition.PartitionName { for _, s := range p.Segments { - delete(node.SegmentsMap, s.SegmentId) + delete(node.SegmentsMap, s.SegmentID) } } else { tmpPartitions = append(tmpPartitions, p) diff --git a/internal/reader/filtered_dm_node.go b/internal/reader/filtered_dm_node.go index d632266c22..ddae4135eb 100644 --- a/internal/reader/filtered_dm_node.go +++ b/internal/reader/filtered_dm_node.go @@ -1,8 +1,9 @@ package reader import ( - "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "log" + + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" ) type filteredDmNode struct { diff --git a/internal/reader/insert_node.go b/internal/reader/insert_node.go index af5b3abb7e..4d86051385 100644 --- a/internal/reader/insert_node.go +++ b/internal/reader/insert_node.go @@ -79,7 +79,7 @@ func (iNode *insertNode) getSegmentBySegmentID(segmentID int64) (*Segment, error func (iNode *insertNode) insert(segmentID int64, wg *sync.WaitGroup) { var targetSegment, err = iNode.getSegmentBySegmentID(segmentID) if err != nil { - log.Println("insert failed") + log.Println("cannot find segment:", segmentID) // TODO: add error handling return } @@ -90,8 +90,13 @@ func (iNode *insertNode) insert(segmentID int64, wg *sync.WaitGroup) { offsets := iNode.insertMsg.insertData.insertOffset[segmentID] err = targetSegment.SegmentInsert(offsets, &ids, ×tamps, &records) - fmt.Println("Do insert done, len = ", len(iNode.insertMsg.insertData.insertIDs[segmentID])) + if err != nil { + log.Println("insert failed") + // TODO: add error handling + return + } + fmt.Println("Do insert done, len = ", len(iNode.insertMsg.insertData.insertIDs[segmentID])) wg.Done() } diff --git a/internal/reader/manipulation_service.go b/internal/reader/manipulation_service.go index e042ef74fe..2aa760d525 100644 --- a/internal/reader/manipulation_service.go +++ b/internal/reader/manipulation_service.go @@ -3,11 +3,12 @@ package reader import ( "context" "fmt" + "log" + "sync" + "github.com/zilliztech/milvus-distributed/internal/msgstream" msgPb "github.com/zilliztech/milvus-distributed/internal/proto/message" "github.com/zilliztech/milvus-distributed/internal/util/flowgraph" - "log" - "sync" ) type manipulationService struct { @@ -148,7 +149,7 @@ func (node *QueryNode) MessagesPreprocess(insertDeleteMessages []*msgPb.InsertOr } node.insertData.insertIDs[msg.SegmentId] = append(node.insertData.insertIDs[msg.SegmentId], msg.Uid) node.insertData.insertTimestamps[msg.SegmentId] = append(node.insertData.insertTimestamps[msg.SegmentId], msg.Timestamp) - // node.insertData.insertRecords[msg.SegmentId] = append(node.insertData.insertRecords[msg.SegmentId], msg.RowsData.Blob) + // node.insertData.insertRecords[msg.SegmentID] = append(node.insertData.insertRecords[msg.SegmentID], msg.RowsData.Blob) } else if msg.Op == msgPb.OpType_DELETE { var r = DeleteRecord{ entityID: msg.Uid, @@ -202,7 +203,7 @@ func (node *QueryNode) MessagesPreprocess(insertDeleteMessages []*msgPb.InsertOr } node.insertData.insertIDs[msg.SegmentId] = append(node.insertData.insertIDs[msg.SegmentId], msg.Uid) node.insertData.insertTimestamps[msg.SegmentId] = append(node.insertData.insertTimestamps[msg.SegmentId], msg.Timestamp) - // node.insertData.insertRecords[msg.SegmentId] = append(node.insertData.insertRecords[msg.SegmentId], msg.RowsData.Blob) + // node.insertData.insertRecords[msg.SegmentID] = append(node.insertData.insertRecords[msg.SegmentID], msg.RowsData.Blob) } else if msg.Op == msgPb.OpType_DELETE { var r = DeleteRecord{ entityID: msg.Uid, diff --git a/internal/reader/manipulation_service_test.go b/internal/reader/manipulation_service_test.go index 45980e49d2..41ec81e82f 100644 --- a/internal/reader/manipulation_service_test.go +++ b/internal/reader/manipulation_service_test.go @@ -93,7 +93,8 @@ func TestInsertAndDelete_WriterDelete(t *testing.T) { conf.LoadConfig("config.yaml") d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) - ctx, _ := context.WithDeadline(context.Background(), d) + ctx, cancel := context.WithDeadline(context.Background(), d) + defer cancel() mc := msgclient.ReaderMessageClient{} pulsarAddr := "pulsar://" @@ -195,7 +196,8 @@ func TestInsertAndDelete_PreInsertAndDelete(t *testing.T) { conf.LoadConfig("config.yaml") d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) - ctx, _ := context.WithDeadline(context.Background(), d) + ctx, cancel := context.WithDeadline(context.Background(), d) + defer cancel() mc := msgclient.ReaderMessageClient{} pulsarAddr := "pulsar://" @@ -327,7 +329,8 @@ func TestInsertAndDelete_DoInsert(t *testing.T) { conf.LoadConfig("config.yaml") d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) - ctx, _ := context.WithDeadline(context.Background(), d) + ctx, cancel := context.WithDeadline(context.Background(), d) + defer cancel() mc := msgclient.ReaderMessageClient{} node := CreateQueryNode(ctx, 0, 0, &mc) @@ -417,7 +420,8 @@ func TestInsertAndDelete_DoDelete(t *testing.T) { conf.LoadConfig("config.yaml") d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) - ctx, _ := context.WithDeadline(context.Background(), d) + ctx, cancel := context.WithDeadline(context.Background(), d) + defer cancel() mc := msgclient.ReaderMessageClient{} pulsarAddr := "pulsar://" @@ -536,7 +540,8 @@ func TestInsertAndDelete_DoInsertAndDelete(t *testing.T) { conf.LoadConfig("config.yaml") d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) - ctx, _ := context.WithDeadline(context.Background(), d) + ctx, cancel := context.WithDeadline(context.Background(), d) + defer cancel() mc := msgclient.ReaderMessageClient{} pulsarAddr := "pulsar://" diff --git a/internal/reader/meta.go b/internal/reader/meta.go index dbfb9e8e46..ac681a9738 100644 --- a/internal/reader/meta.go +++ b/internal/reader/meta.go @@ -23,12 +23,12 @@ const ( SegmentPrefix = "/segment/" ) -func GetCollectionObjId(key string) string { +func GetCollectionObjID(key string) string { prefix := path.Join(conf.Config.Etcd.Rootpath, CollectionPrefix) + "/" return strings.TrimPrefix(key, prefix) } -func GetSegmentObjId(key string) string { +func GetSegmentObjID(key string) string { prefix := path.Join(conf.Config.Etcd.Rootpath, SegmentPrefix) + "/" return strings.TrimPrefix(key, prefix) } @@ -133,10 +133,10 @@ func (node *QueryNode) processSegmentCreate(id string, value string) { func (node *QueryNode) processCreate(key string, msg string) { println("process create", key) if isCollectionObj(key) { - objID := GetCollectionObjId(key) + objID := GetCollectionObjID(key) node.processCollectionCreate(objID, msg) } else if isSegmentObj(key) { - objID := GetSegmentObjId(key) + objID := GetSegmentObjID(key) node.processSegmentCreate(objID, msg) } else { println("can not process create msg:", key) @@ -170,10 +170,10 @@ func (node *QueryNode) processCollectionModify(id string, value string) { func (node *QueryNode) processModify(key string, msg string) { // println("process modify") if isCollectionObj(key) { - objID := GetCollectionObjId(key) + objID := GetCollectionObjID(key) node.processCollectionModify(objID, msg) } else if isSegmentObj(key) { - objID := GetSegmentObjId(key) + objID := GetSegmentObjID(key) node.processSegmentModify(objID, msg) } else { println("can not process modify msg:", key) @@ -183,7 +183,7 @@ func (node *QueryNode) processModify(key string, msg string) { func (node *QueryNode) processSegmentDelete(id string) { println("Delete segment: ", id) - segmentId, err := strconv.ParseInt(id, 10, 64) + segmentID, err := strconv.ParseInt(id, 10, 64) if err != nil { log.Println("Cannot parse segment id:" + id) } @@ -191,7 +191,7 @@ func (node *QueryNode) processSegmentDelete(id string) { for _, col := range node.Collections { for _, p := range col.Partitions { for _, s := range p.Segments { - if s.SegmentId == segmentId { + if s.SegmentID == segmentID { p.DeleteSegment(node, s) } } @@ -202,22 +202,22 @@ func (node *QueryNode) processSegmentDelete(id string) { func (node *QueryNode) processCollectionDelete(id string) { println("Delete collection: ", id) - collectionId, err := strconv.ParseInt(id, 10, 64) + collectionID, err := strconv.ParseInt(id, 10, 64) if err != nil { log.Println("Cannot parse collection id:" + id) } - targetCollection := node.GetCollectionByID(collectionId) + targetCollection := node.GetCollectionByID(collectionID) node.DeleteCollection(targetCollection) } func (node *QueryNode) processDelete(key string) { println("process delete") if isCollectionObj(key) { - objID := GetCollectionObjId(key) + objID := GetCollectionObjID(key) node.processCollectionDelete(objID) } else if isSegmentObj(key) { - objID := GetSegmentObjId(key) + objID := GetSegmentObjID(key) node.processSegmentDelete(objID) } else { println("can not process delete msg:", key) @@ -256,7 +256,7 @@ func (node *QueryNode) loadCollections() error { return err } for i := range keys { - objID := GetCollectionObjId(keys[i]) + objID := GetCollectionObjID(keys[i]) node.processCollectionCreate(objID, values[i]) } return nil @@ -267,7 +267,7 @@ func (node *QueryNode) loadSegments() error { return err } for i := range keys { - objID := GetSegmentObjId(keys[i]) + objID := GetSegmentObjID(keys[i]) node.processSegmentCreate(objID, values[i]) } return nil diff --git a/internal/reader/meta_test.go b/internal/reader/meta_test.go index c001f8ac74..facef2a9e1 100644 --- a/internal/reader/meta_test.go +++ b/internal/reader/meta_test.go @@ -20,28 +20,28 @@ func TestMeta_GetCollectionObjId(t *testing.T) { conf.LoadConfig("config.yaml") var key = "/collection/collection0" - var collectionObjId1 = GetCollectionObjId(key) + var collectionObjID1 = GetCollectionObjID(key) - assert.Equal(t, collectionObjId1, "/collection/collection0") + assert.Equal(t, collectionObjID1, "/collection/collection0") key = "fakeKey" - var collectionObjId2 = GetCollectionObjId(key) + var collectionObjID2 = GetCollectionObjID(key) - assert.Equal(t, collectionObjId2, "fakeKey") + assert.Equal(t, collectionObjID2, "fakeKey") } func TestMeta_GetSegmentObjId(t *testing.T) { conf.LoadConfig("config.yaml") var key = "/segment/segment0" - var segmentObjId1 = GetSegmentObjId(key) + var segmentObjID1 = GetSegmentObjID(key) - assert.Equal(t, segmentObjId1, "/segment/segment0") + assert.Equal(t, segmentObjID1, "/segment/segment0") key = "fakeKey" - var segmentObjId2 = GetSegmentObjId(key) + var segmentObjID2 = GetSegmentObjID(key) - assert.Equal(t, segmentObjId2, "fakeKey") + assert.Equal(t, segmentObjID2, "fakeKey") } func TestMeta_isCollectionObj(t *testing.T) { @@ -158,7 +158,8 @@ func TestMeta_ProcessCollectionCreate(t *testing.T) { conf.LoadConfig("config.yaml") d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) - ctx, _ := context.WithDeadline(context.Background(), d) + ctx, cancel := context.WithDeadline(context.Background(), d) + defer cancel() mc := msgclient.ReaderMessageClient{} node := CreateQueryNode(ctx, 0, 0, &mc) @@ -185,7 +186,8 @@ func TestMeta_ProcessSegmentCreate(t *testing.T) { conf.LoadConfig("config.yaml") d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) - ctx, _ := context.WithDeadline(context.Background(), d) + ctx, cancel := context.WithDeadline(context.Background(), d) + defer cancel() mc := msgclient.ReaderMessageClient{} node := CreateQueryNode(ctx, 0, 0, &mc) @@ -202,7 +204,7 @@ func TestMeta_ProcessSegmentCreate(t *testing.T) { node.processSegmentCreate(id, value) s := node.SegmentsMap[int64(0)] - assert.Equal(t, s.SegmentId, int64(0)) + assert.Equal(t, s.SegmentID, int64(0)) assert.Equal(t, s.SegmentCloseTime, uint64(70368744177663)) assert.Equal(t, s.SegmentStatus, 0) } @@ -211,7 +213,8 @@ func TestMeta_ProcessCreate(t *testing.T) { conf.LoadConfig("config.yaml") d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) - ctx, _ := context.WithDeadline(context.Background(), d) + ctx, cancel := context.WithDeadline(context.Background(), d) + defer cancel() mc := msgclient.ReaderMessageClient{} node := CreateQueryNode(ctx, 0, 0, &mc) @@ -242,7 +245,7 @@ func TestMeta_ProcessCreate(t *testing.T) { node.processCreate(key2, msg2) s := node.SegmentsMap[int64(0)] - assert.Equal(t, s.SegmentId, int64(0)) + assert.Equal(t, s.SegmentID, int64(0)) assert.Equal(t, s.SegmentCloseTime, uint64(70368744177663)) assert.Equal(t, s.SegmentStatus, 0) } @@ -251,7 +254,8 @@ func TestMeta_ProcessSegmentModify(t *testing.T) { conf.LoadConfig("config.yaml") d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) - ctx, _ := context.WithDeadline(context.Background(), d) + ctx, cancel := context.WithDeadline(context.Background(), d) + defer cancel() mc := msgclient.ReaderMessageClient{} node := CreateQueryNode(ctx, 0, 0, &mc) @@ -268,7 +272,7 @@ func TestMeta_ProcessSegmentModify(t *testing.T) { node.processSegmentCreate(id, value) var s = node.SegmentsMap[int64(0)] - assert.Equal(t, s.SegmentId, int64(0)) + assert.Equal(t, s.SegmentID, int64(0)) assert.Equal(t, s.SegmentCloseTime, uint64(70368744177663)) assert.Equal(t, s.SegmentStatus, 0) @@ -280,7 +284,7 @@ func TestMeta_ProcessSegmentModify(t *testing.T) { node.processSegmentModify(id, newValue) s = node.SegmentsMap[int64(0)] - assert.Equal(t, s.SegmentId, int64(0)) + assert.Equal(t, s.SegmentID, int64(0)) assert.Equal(t, s.SegmentCloseTime, uint64(70368744177888)) assert.Equal(t, s.SegmentStatus, 0) } @@ -289,7 +293,8 @@ func TestMeta_ProcessCollectionModify(t *testing.T) { conf.LoadConfig("config.yaml") d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) - ctx, _ := context.WithDeadline(context.Background(), d) + ctx, cancel := context.WithDeadline(context.Background(), d) + defer cancel() mc := msgclient.ReaderMessageClient{} node := CreateQueryNode(ctx, 0, 0, &mc) @@ -333,7 +338,8 @@ func TestMeta_ProcessModify(t *testing.T) { conf.LoadConfig("config.yaml") d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) - ctx, _ := context.WithDeadline(context.Background(), d) + ctx, cancel := context.WithDeadline(context.Background(), d) + defer cancel() mc := msgclient.ReaderMessageClient{} node := CreateQueryNode(ctx, 0, 0, &mc) @@ -364,7 +370,7 @@ func TestMeta_ProcessModify(t *testing.T) { node.processCreate(key2, msg2) s := node.SegmentsMap[int64(0)] - assert.Equal(t, s.SegmentId, int64(0)) + assert.Equal(t, s.SegmentID, int64(0)) assert.Equal(t, s.SegmentCloseTime, uint64(70368744177663)) assert.Equal(t, s.SegmentStatus, 0) @@ -394,7 +400,7 @@ func TestMeta_ProcessModify(t *testing.T) { node.processModify(key2, msg4) s = node.SegmentsMap[int64(0)] - assert.Equal(t, s.SegmentId, int64(0)) + assert.Equal(t, s.SegmentID, int64(0)) assert.Equal(t, s.SegmentCloseTime, uint64(70368744177888)) assert.Equal(t, s.SegmentStatus, 0) } @@ -403,7 +409,8 @@ func TestMeta_ProcessSegmentDelete(t *testing.T) { conf.LoadConfig("config.yaml") d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) - ctx, _ := context.WithDeadline(context.Background(), d) + ctx, cancel := context.WithDeadline(context.Background(), d) + defer cancel() mc := msgclient.ReaderMessageClient{} node := CreateQueryNode(ctx, 0, 0, &mc) @@ -420,7 +427,7 @@ func TestMeta_ProcessSegmentDelete(t *testing.T) { node.processSegmentCreate(id, value) s := node.SegmentsMap[int64(0)] - assert.Equal(t, s.SegmentId, int64(0)) + assert.Equal(t, s.SegmentID, int64(0)) assert.Equal(t, s.SegmentCloseTime, uint64(70368744177663)) assert.Equal(t, s.SegmentStatus, 0) @@ -434,7 +441,8 @@ func TestMeta_ProcessCollectionDelete(t *testing.T) { conf.LoadConfig("config.yaml") d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) - ctx, _ := context.WithDeadline(context.Background(), d) + ctx, cancel := context.WithDeadline(context.Background(), d) + defer cancel() mc := msgclient.ReaderMessageClient{} node := CreateQueryNode(ctx, 0, 0, &mc) @@ -466,7 +474,8 @@ func TestMeta_ProcessDelete(t *testing.T) { conf.LoadConfig("config.yaml") d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) - ctx, _ := context.WithDeadline(context.Background(), d) + ctx, cancel := context.WithDeadline(context.Background(), d) + defer cancel() mc := msgclient.ReaderMessageClient{} node := CreateQueryNode(ctx, 0, 0, &mc) @@ -497,7 +506,7 @@ func TestMeta_ProcessDelete(t *testing.T) { node.processCreate(key2, msg2) s := node.SegmentsMap[int64(0)] - assert.Equal(t, s.SegmentId, int64(0)) + assert.Equal(t, s.SegmentID, int64(0)) assert.Equal(t, s.SegmentCloseTime, uint64(70368744177663)) assert.Equal(t, s.SegmentStatus, 0) @@ -515,7 +524,8 @@ func TestMeta_ProcessResp(t *testing.T) { conf.LoadConfig("config.yaml") d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) - ctx, _ := context.WithDeadline(context.Background(), d) + ctx, cancel := context.WithDeadline(context.Background(), d) + defer cancel() mc := msgclient.ReaderMessageClient{} node := CreateQueryNode(ctx, 0, 0, &mc) @@ -537,7 +547,8 @@ func TestMeta_LoadCollections(t *testing.T) { conf.LoadConfig("config.yaml") d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) - ctx, _ := context.WithDeadline(context.Background(), d) + ctx, cancel := context.WithDeadline(context.Background(), d) + defer cancel() mc := msgclient.ReaderMessageClient{} node := CreateQueryNode(ctx, 0, 0, &mc) @@ -553,7 +564,8 @@ func TestMeta_LoadSegments(t *testing.T) { conf.LoadConfig("config.yaml") d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) - ctx, _ := context.WithDeadline(context.Background(), d) + ctx, cancel := context.WithDeadline(context.Background(), d) + defer cancel() mc := msgclient.ReaderMessageClient{} node := CreateQueryNode(ctx, 0, 0, &mc) @@ -569,7 +581,8 @@ func TestMeta_InitFromMeta(t *testing.T) { conf.LoadConfig("config.yaml") d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) - ctx, _ := context.WithDeadline(context.Background(), d) + ctx, cancel := context.WithDeadline(context.Background(), d) + defer cancel() mc := msgclient.ReaderMessageClient{} node := CreateQueryNode(ctx, 0, 0, &mc) @@ -582,7 +595,8 @@ func TestMeta_RunMetaService(t *testing.T) { conf.LoadConfig("config.yaml") d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) - ctx, _ := context.WithDeadline(context.Background(), d) + ctx, cancel := context.WithDeadline(context.Background(), d) + defer cancel() node := CreateQueryNode(ctx, 0, 0, nil) diff --git a/internal/reader/msg_stream_node.go b/internal/reader/msg_stream_node.go index 2bc3a4e5fc..c6fdbc95fa 100644 --- a/internal/reader/msg_stream_node.go +++ b/internal/reader/msg_stream_node.go @@ -1,8 +1,9 @@ package reader import ( - "github.com/zilliztech/milvus-distributed/internal/msgstream" "log" + + "github.com/zilliztech/milvus-distributed/internal/msgstream" ) type msgStreamNode struct { diff --git a/internal/reader/partition.go b/internal/reader/partition.go index a628b1a7f8..7d1d2f6593 100644 --- a/internal/reader/partition.go +++ b/internal/reader/partition.go @@ -19,14 +19,14 @@ type Partition struct { Segments []*Segment } -func (p *Partition) NewSegment(segmentId int64) *Segment { +func (p *Partition) NewSegment(segmentID int64) *Segment { /* CSegmentBase NewSegment(CPartition partition, unsigned long segment_id); */ - segmentPtr := C.NewSegment(p.PartitionPtr, C.ulong(segmentId)) + segmentPtr := C.NewSegment(p.PartitionPtr, C.ulong(segmentID)) - var newSegment = &Segment{SegmentPtr: segmentPtr, SegmentId: segmentId} + var newSegment = &Segment{SegmentPtr: segmentPtr, SegmentID: segmentID} p.Segments = append(p.Segments, newSegment) return newSegment } @@ -42,8 +42,8 @@ func (p *Partition) DeleteSegment(node *QueryNode, segment *Segment) { tmpSegments := make([]*Segment, 0) for _, s := range p.Segments { - if s.SegmentId == segment.SegmentId { - delete(node.SegmentsMap, s.SegmentId) + if s.SegmentID == segment.SegmentID { + delete(node.SegmentsMap, s.SegmentID) } else { tmpSegments = append(tmpSegments, s) } diff --git a/internal/reader/partition_test.go b/internal/reader/partition_test.go index d6c44871f2..d1c94ca53c 100644 --- a/internal/reader/partition_test.go +++ b/internal/reader/partition_test.go @@ -20,13 +20,13 @@ func TestPartition_NewSegment(t *testing.T) { assert.Equal(t, collection.CollectionName, "collection0") assert.Equal(t, collection.CollectionID, uint64(0)) assert.Equal(t, partition.PartitionName, "partition0") - assert.Equal(t, node.Collections[0].Partitions[0].Segments[0].SegmentId, int64(0)) + assert.Equal(t, node.Collections[0].Partitions[0].Segments[0].SegmentID, int64(0)) assert.Equal(t, len(collection.Partitions), 1) assert.Equal(t, len(node.Collections), 1) assert.Equal(t, len(node.Collections[0].Partitions[0].Segments), 1) - assert.Equal(t, segment.SegmentId, int64(0)) + assert.Equal(t, segment.SegmentID, int64(0)) assert.Equal(t, node.FoundSegmentBySegmentID(int64(0)), true) } @@ -44,13 +44,13 @@ func TestPartition_DeleteSegment(t *testing.T) { assert.Equal(t, collection.CollectionName, "collection0") assert.Equal(t, collection.CollectionID, uint64(0)) assert.Equal(t, partition.PartitionName, "partition0") - assert.Equal(t, node.Collections[0].Partitions[0].Segments[0].SegmentId, int64(0)) + assert.Equal(t, node.Collections[0].Partitions[0].Segments[0].SegmentID, int64(0)) assert.Equal(t, len(collection.Partitions), 1) assert.Equal(t, len(node.Collections), 1) assert.Equal(t, len(node.Collections[0].Partitions[0].Segments), 1) - assert.Equal(t, segment.SegmentId, int64(0)) + assert.Equal(t, segment.SegmentID, int64(0)) // 2. Destruct collection, partition and segment partition.DeleteSegment(node, segment) diff --git a/internal/reader/query_node.go b/internal/reader/query_node.go index 78011586b7..eed03da861 100644 --- a/internal/reader/query_node.go +++ b/internal/reader/query_node.go @@ -85,7 +85,7 @@ type QueryNode struct { // context ctx context.Context - QueryNodeId uint64 + QueryNodeID uint64 Collections []*Collection SegmentsMap map[int64]*Segment messageClient *msgclient.ReaderMessageClient @@ -99,7 +99,7 @@ type QueryNode struct { InsertLogs []InsertLog } -func NewQueryNode(ctx context.Context, queryNodeId uint64, timeSync uint64) *QueryNode { +func NewQueryNode(ctx context.Context, queryNodeID uint64, timeSync uint64) *QueryNode { mc := msgclient.ReaderMessageClient{} queryNodeTimeSync := &QueryNodeTime{ @@ -127,7 +127,7 @@ func NewQueryNode(ctx context.Context, queryNodeId uint64, timeSync uint64) *Que return &QueryNode{ ctx: ctx, - QueryNodeId: queryNodeId, + QueryNodeID: queryNodeID, Collections: nil, SegmentsMap: segmentsMap, messageClient: &mc, @@ -146,7 +146,7 @@ func (node *QueryNode) Close() { } } -func CreateQueryNode(ctx context.Context, queryNodeId uint64, timeSync uint64, mc *msgclient.ReaderMessageClient) *QueryNode { +func CreateQueryNode(ctx context.Context, queryNodeID uint64, timeSync uint64, mc *msgclient.ReaderMessageClient) *QueryNode { queryNodeTimeSync := &QueryNodeTime{ ReadTimeSyncMin: timeSync, ReadTimeSyncMax: timeSync, @@ -175,7 +175,7 @@ func CreateQueryNode(ctx context.Context, queryNodeId uint64, timeSync uint64, m return &QueryNode{ ctx: ctx, - QueryNodeId: queryNodeId, + QueryNodeID: queryNodeID, Collections: nil, SegmentsMap: segmentsMap, messageClient: mc, @@ -202,7 +202,7 @@ func (node *QueryNode) QueryNodeDataInit() { insertIDs: make(map[int64][]int64), insertTimestamps: make(map[int64][]uint64), // insertRecords: make(map[int64][][]byte), - insertOffset: make(map[int64]int64), + insertOffset: make(map[int64]int64), } node.deletePreprocessData = deletePreprocessData @@ -235,7 +235,7 @@ func (node *QueryNode) DeleteCollection(collection *Collection) { if col.CollectionID == collectionID { for _, p := range collection.Partitions { for _, s := range p.Segments { - delete(node.SegmentsMap, s.SegmentId) + delete(node.SegmentsMap, s.SegmentID) } } } else { diff --git a/internal/reader/quety_node_test.go b/internal/reader/query_node_test.go similarity index 78% rename from internal/reader/quety_node_test.go rename to internal/reader/query_node_test.go index 09eaf56f4e..7d153ce9fd 100644 --- a/internal/reader/quety_node_test.go +++ b/internal/reader/query_node_test.go @@ -10,7 +10,8 @@ import ( func TestQueryNode_CreateQueryNode(t *testing.T) { conf.LoadConfig("config.yaml") - ctx, _ := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() node := CreateQueryNode(ctx, 0, 0, nil) assert.NotNil(t, node) @@ -18,7 +19,8 @@ func TestQueryNode_CreateQueryNode(t *testing.T) { func TestQueryNode_NewQueryNode(t *testing.T) { conf.LoadConfig("config.yaml") - ctx, _ := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() node := NewQueryNode(ctx, 0, 0) assert.NotNil(t, node) @@ -26,7 +28,8 @@ func TestQueryNode_NewQueryNode(t *testing.T) { func TestQueryNode_Close(t *testing.T) { conf.LoadConfig("config.yaml") - ctx, _ := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() node := CreateQueryNode(ctx, 0, 0, nil) assert.NotNil(t, node) @@ -36,7 +39,8 @@ func TestQueryNode_Close(t *testing.T) { func TestQueryNode_QueryNodeDataInit(t *testing.T) { conf.LoadConfig("config.yaml") - ctx, _ := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() node := CreateQueryNode(ctx, 0, 0, nil) assert.NotNil(t, node) @@ -50,7 +54,8 @@ func TestQueryNode_QueryNodeDataInit(t *testing.T) { func TestQueryNode_NewCollection(t *testing.T) { conf.LoadConfig("config.yaml") - ctx, _ := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() node := CreateQueryNode(ctx, 0, 0, nil) assert.NotNil(t, node) @@ -63,7 +68,8 @@ func TestQueryNode_NewCollection(t *testing.T) { func TestQueryNode_DeleteCollection(t *testing.T) { conf.LoadConfig("config.yaml") - ctx, _ := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() node := CreateQueryNode(ctx, 0, 0, nil) assert.NotNil(t, node) diff --git a/internal/reader/reader.go b/internal/reader/reader.go index 06330ffad2..70fb97fa95 100644 --- a/internal/reader/reader.go +++ b/internal/reader/reader.go @@ -103,7 +103,6 @@ func (node *QueryNode) RunInsertDelete(wg *sync.WaitGroup) { } } } - wg.Done() } func (node *QueryNode) RunSearch(wg *sync.WaitGroup) { @@ -129,5 +128,4 @@ func (node *QueryNode) RunSearch(wg *sync.WaitGroup) { default: } } - wg.Done() } diff --git a/internal/reader/reader_test.go b/internal/reader/reader_test.go index 39b9ae757a..ee95accc93 100644 --- a/internal/reader/reader_test.go +++ b/internal/reader/reader_test.go @@ -19,7 +19,8 @@ func TestReader_startQueryNode(t *testing.T) { conf.LoadConfig("config.yaml") d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) - ctx, _ := context.WithDeadline(context.Background(), d) + ctx, cancel := context.WithDeadline(context.Background(), d) + defer cancel() pulsarAddr := "pulsar://" pulsarAddr += conf.Config.Pulsar.Address @@ -37,7 +38,8 @@ func TestReader_RunInsertDelete(t *testing.T) { conf.LoadConfig("config.yaml") d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) - ctx, _ := context.WithDeadline(context.Background(), d) + ctx, cancel := context.WithDeadline(context.Background(), d) + defer cancel() mc := msgclient.ReaderMessageClient{} pulsarAddr := "pulsar://" @@ -67,7 +69,8 @@ func TestReader_RunSearch(t *testing.T) { conf.LoadConfig("config.yaml") d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) - ctx, _ := context.WithDeadline(context.Background(), d) + ctx, cancel := context.WithDeadline(context.Background(), d) + defer cancel() mc := msgclient.ReaderMessageClient{} pulsarAddr := "pulsar://" diff --git a/internal/reader/result_test.go b/internal/reader/result_test.go index 081873f5bb..01a40cd834 100644 --- a/internal/reader/result_test.go +++ b/internal/reader/result_test.go @@ -20,7 +20,8 @@ func TestResult_PublishSearchResult(t *testing.T) { conf.LoadConfig("config.yaml") d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) - ctx, _ := context.WithDeadline(context.Background(), d) + ctx, cancel := context.WithDeadline(context.Background(), d) + defer cancel() mc := msgclient.ReaderMessageClient{} pulsarAddr := "pulsar://" @@ -61,7 +62,8 @@ func TestResult_PublishFailedSearchResult(t *testing.T) { conf.LoadConfig("config.yaml") d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) - ctx, _ := context.WithDeadline(context.Background(), d) + ctx, cancel := context.WithDeadline(context.Background(), d) + defer cancel() mc := msgclient.ReaderMessageClient{} pulsarAddr := "pulsar://" @@ -89,7 +91,8 @@ func TestResult_PublicStatistic(t *testing.T) { conf.LoadConfig("config.yaml") d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) - ctx, _ := context.WithDeadline(context.Background(), d) + ctx, cancel := context.WithDeadline(context.Background(), d) + defer cancel() mc := msgclient.ReaderMessageClient{} pulsarAddr := "pulsar://" diff --git a/internal/reader/search.go b/internal/reader/search.go index 8c4934ca63..ff17df33f8 100644 --- a/internal/reader/search.go +++ b/internal/reader/search.go @@ -11,7 +11,7 @@ import ( func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status { type SearchResultTmp struct { - ResultId int64 + ResultID int64 ResultDistance float32 } @@ -20,7 +20,7 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status { // Traverse all messages in the current messageClient. // TODO: Do not receive batched search requests for _, msg := range searchMessages { - var clientId = msg.ClientId + var clientID = msg.ClientId var searchTimestamp = msg.Timestamp // ServiceTimeSync update by TimeSync, which is get from proxy. @@ -34,7 +34,7 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status { var vector = msg.Records // We now only the first Json is valid. - var queryJson = msg.Json[0] + var queryJSON = msg.Json[0] // 1. Timestamp check // TODO: return or wait? Or adding graceful time @@ -44,7 +44,7 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status { } // 2. Get query information from query json - query := node.QueryJson2Info(&queryJson) + query := node.QueryJSON2Info(&queryJSON) // 2d slice for receiving multiple queries's results var resultsTmp = make([][]SearchResultTmp, query.NumQueries) for i := 0; i < int(query.NumQueries); i++ { @@ -58,7 +58,7 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status { continue } - //fmt.Println("Search in segment:", segment.SegmentId, ",segment rows:", segment.GetRowCount()) + //fmt.Println("Search in segment:", segment.SegmentID, ",segment rows:", segment.GetRowCount()) var res, err = segment.SegmentSearch(query, searchTimestamp, vector) if err != nil { fmt.Println(err.Error()) @@ -68,7 +68,7 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status { for i := 0; i < int(query.NumQueries); i++ { for j := i * query.TopK; j < (i+1)*query.TopK; j++ { resultsTmp[i] = append(resultsTmp[i], SearchResultTmp{ - ResultId: res.ResultIds[j], + ResultID: res.ResultIds[j], ResultDistance: res.ResultDistances[j], }) } @@ -98,11 +98,11 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status { Entities: &entities, Distances: make([]float32, 0), QueryId: msg.Uid, - ProxyId: clientId, + ProxyId: clientID, } for _, rTmp := range resultsTmp { for _, res := range rTmp { - results.Entities.Ids = append(results.Entities.Ids, res.ResultId) + results.Entities.Ids = append(results.Entities.Ids, res.ResultID) results.Distances = append(results.Distances, res.ResultDistance) results.Scores = append(results.Distances, float32(0)) } diff --git a/internal/reader/search_service.go b/internal/reader/search_service.go index db9668c412..a15aad39de 100644 --- a/internal/reader/search_service.go +++ b/internal/reader/search_service.go @@ -2,6 +2,7 @@ package reader import ( "context" + "github.com/zilliztech/milvus-distributed/internal/msgstream" ) diff --git a/internal/reader/search_test.go b/internal/reader/search_test.go index 089e84071f..300b66fff5 100644 --- a/internal/reader/search_test.go +++ b/internal/reader/search_test.go @@ -18,7 +18,8 @@ import ( func TestSearch_Search(t *testing.T) { conf.LoadConfig("config.yaml") - ctx, _ := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() mc := msgclient.ReaderMessageClient{} @@ -114,7 +115,7 @@ func TestSearch_Search(t *testing.T) { queryRawData = append(queryRawData, float32(i)) } - var queryJson = "{\"field_name\":\"fakevec\",\"num_queries\":1,\"topK\":10}" + var queryJSON = "{\"field_name\":\"fakevec\",\"num_queries\":1,\"topK\":10}" searchMsg1 := msgPb.SearchMsg{ CollectionName: "collection0", Records: &msgPb.VectorRowRecord{ @@ -125,7 +126,7 @@ func TestSearch_Search(t *testing.T) { Timestamp: uint64(0), ClientId: int64(0), ExtraParams: nil, - Json: []string{queryJson}, + Json: []string{queryJSON}, } searchMessages := []*msgPb.SearchMsg{&searchMsg1} diff --git a/internal/reader/segment.go b/internal/reader/segment.go index 17f48f6d57..1bacbc7a4e 100644 --- a/internal/reader/segment.go +++ b/internal/reader/segment.go @@ -30,7 +30,7 @@ const ( type Segment struct { SegmentPtr C.CSegmentBase - SegmentId int64 + SegmentID int64 SegmentCloseTime uint64 LastMemSize int64 SegmentStatus int @@ -72,7 +72,7 @@ func (s *Segment) GetDeletedCount() int64 { // int // Close(CSegmentBase c_segment); // */ -// fmt.Println("Closing segment :", s.SegmentId) +// fmt.Println("Closing segment :", s.SegmentID) // // var status = C.Close(s.SegmentPtr) // s.SegmentStatus = SegmentClosed @@ -226,9 +226,9 @@ func (s *Segment) SegmentSearch(query *QueryInfo, timestamp uint64, vectorRecord var cQueryRawDataLength C.int if vectorRecord.BinaryData != nil { - return nil, errors.New("Data of binary type is not supported yet") + return nil, errors.New("data of binary type is not supported yet") } else if len(vectorRecord.FloatData) <= 0 { - return nil, errors.New("Null query vector data") + return nil, errors.New("null query vector data") } else { cQueryRawData = (*C.float)(&vectorRecord.FloatData[0]) cQueryRawDataLength = (C.int)(len(vectorRecord.FloatData)) diff --git a/internal/reader/segment_service_test.go b/internal/reader/segment_service_test.go index 0cde5da7aa..1dafcbf0f5 100644 --- a/internal/reader/segment_service_test.go +++ b/internal/reader/segment_service_test.go @@ -44,7 +44,8 @@ import ( func TestSegmentManagement_SegmentStatistic(t *testing.T) { conf.LoadConfig("config.yaml") - ctx, _ := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() mc := msgclient.ReaderMessageClient{} pulsarAddr := "pulsar://" @@ -73,7 +74,8 @@ func TestSegmentManagement_SegmentStatisticService(t *testing.T) { conf.LoadConfig("config.yaml") d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) - ctx, _ := context.WithDeadline(context.Background(), d) + ctx, cancel := context.WithDeadline(context.Background(), d) + defer cancel() mc := msgclient.ReaderMessageClient{} pulsarAddr := "pulsar://" diff --git a/internal/reader/segment_test.go b/internal/reader/segment_test.go index ea413b0f7b..c4cb6f86de 100644 --- a/internal/reader/segment_test.go +++ b/internal/reader/segment_test.go @@ -23,7 +23,7 @@ func TestSegment_ConstructorAndDestructor(t *testing.T) { assert.Equal(t, collection.CollectionName, "collection0") assert.Equal(t, partition.PartitionName, "partition0") - assert.Equal(t, segment.SegmentId, int64(0)) + assert.Equal(t, segment.SegmentID, int64(0)) assert.Equal(t, len(node.SegmentsMap), 1) // 2. Destruct collection, partition and segment @@ -49,7 +49,7 @@ func TestSegment_SegmentInsert(t *testing.T) { assert.Equal(t, collection.CollectionName, "collection0") assert.Equal(t, partition.PartitionName, "partition0") - assert.Equal(t, segment.SegmentId, int64(0)) + assert.Equal(t, segment.SegmentID, int64(0)) assert.Equal(t, len(node.SegmentsMap), 1) // 2. Create ids and timestamps @@ -107,7 +107,7 @@ func TestSegment_SegmentDelete(t *testing.T) { assert.Equal(t, collection.CollectionName, "collection0") assert.Equal(t, partition.PartitionName, "partition0") - assert.Equal(t, segment.SegmentId, int64(0)) + assert.Equal(t, segment.SegmentID, int64(0)) assert.Equal(t, len(node.SegmentsMap), 1) // 2. Create ids and timestamps @@ -145,7 +145,7 @@ func TestSegment_SegmentSearch(t *testing.T) { assert.Equal(t, collection.CollectionName, "collection0") assert.Equal(t, partition.PartitionName, "partition0") - assert.Equal(t, segment.SegmentId, int64(0)) + assert.Equal(t, segment.SegmentID, int64(0)) assert.Equal(t, len(node.SegmentsMap), 1) // 2. Create ids and timestamps @@ -183,7 +183,7 @@ func TestSegment_SegmentSearch(t *testing.T) { //assert.NoError(t, err) // 6. Do search - var queryJson = "{\"field_name\":\"fakevec\",\"num_queries\":1,\"topK\":10}" + var queryJSON = "{\"field_name\":\"fakevec\",\"num_queries\":1,\"topK\":10}" var queryRawData = make([]float32, 0) for i := 0; i < 16; i++ { queryRawData = append(queryRawData, float32(i)) @@ -191,7 +191,7 @@ func TestSegment_SegmentSearch(t *testing.T) { var vectorRecord = msgPb.VectorRowRecord{ FloatData: queryRawData, } - query := node.QueryJson2Info(&queryJson) + query := node.QueryJSON2Info(&queryJSON) var searchRes, searchErr = segment.SegmentSearch(query, timestamps[N/2], &vectorRecord) assert.NoError(t, searchErr) fmt.Println(searchRes) @@ -219,7 +219,7 @@ func TestSegment_SegmentPreInsert(t *testing.T) { assert.Equal(t, collection.CollectionName, "collection0") assert.Equal(t, partition.PartitionName, "partition0") - assert.Equal(t, segment.SegmentId, int64(0)) + assert.Equal(t, segment.SegmentID, int64(0)) assert.Equal(t, len(node.SegmentsMap), 1) // 2. Do PreInsert @@ -249,7 +249,7 @@ func TestSegment_SegmentPreDelete(t *testing.T) { assert.Equal(t, collection.CollectionName, "collection0") assert.Equal(t, partition.PartitionName, "partition0") - assert.Equal(t, segment.SegmentId, int64(0)) + assert.Equal(t, segment.SegmentID, int64(0)) assert.Equal(t, len(node.SegmentsMap), 1) // 2. Do PreDelete @@ -321,7 +321,7 @@ func TestSegment_GetRowCount(t *testing.T) { assert.Equal(t, collection.CollectionName, "collection0") assert.Equal(t, partition.PartitionName, "partition0") - assert.Equal(t, segment.SegmentId, int64(0)) + assert.Equal(t, segment.SegmentID, int64(0)) assert.Equal(t, len(node.SegmentsMap), 1) // 2. Create ids and timestamps @@ -383,7 +383,7 @@ func TestSegment_GetDeletedCount(t *testing.T) { assert.Equal(t, collection.CollectionName, "collection0") assert.Equal(t, partition.PartitionName, "partition0") - assert.Equal(t, segment.SegmentId, int64(0)) + assert.Equal(t, segment.SegmentID, int64(0)) assert.Equal(t, len(node.SegmentsMap), 1) // 2. Create ids and timestamps @@ -426,7 +426,7 @@ func TestSegment_GetMemSize(t *testing.T) { assert.Equal(t, collection.CollectionName, "collection0") assert.Equal(t, partition.PartitionName, "partition0") - assert.Equal(t, segment.SegmentId, int64(0)) + assert.Equal(t, segment.SegmentID, int64(0)) assert.Equal(t, len(node.SegmentsMap), 1) // 2. Create ids and timestamps @@ -496,7 +496,7 @@ func TestSegment_RealSchemaTest(t *testing.T) { assert.Equal(t, collection.CollectionName, "collection0") assert.Equal(t, partition.PartitionName, "partition0") - assert.Equal(t, segment.SegmentId, int64(0)) + assert.Equal(t, segment.SegmentID, int64(0)) assert.Equal(t, len(node.SegmentsMap), 1) // 2. Create ids and timestamps diff --git a/internal/reader/util_functions.go b/internal/reader/util_functions.go index 5ffaf4ac0f..3d1c34d5f7 100644 --- a/internal/reader/util_functions.go +++ b/internal/reader/util_functions.go @@ -66,13 +66,9 @@ func (node *QueryNode) GetSegmentBySegmentID(segmentID int64) (*Segment, error) } func (node *QueryNode) FoundSegmentBySegmentID(segmentID int64) bool { - targetSegment := node.SegmentsMap[segmentID] + _, ok := node.SegmentsMap[segmentID] - if targetSegment == nil { - return false - } - - return true + return ok } func (c *Collection) GetPartitionByName(partitionName string) (partition *Partition) { @@ -111,12 +107,12 @@ func (node *QueryNode) WriteQueryLog() { // write logs for _, insertLog := range node.InsertLogs { - insertLogJson, err := json.Marshal(&insertLog) + insertLogJSON, err := json.Marshal(&insertLog) if err != nil { log.Fatal(err) } - writeString := string(insertLogJson) + "\n" + writeString := string(insertLogJSON) + "\n" fmt.Println(writeString) _, err2 := f.WriteString(writeString) @@ -141,9 +137,9 @@ func (node *QueryNode) PrepareBatchMsg() []int { return msgLen } -func (node *QueryNode) QueryJson2Info(queryJson *string) *QueryInfo { +func (node *QueryNode) QueryJSON2Info(queryJSON *string) *QueryInfo { var query QueryInfo - var err = json.Unmarshal([]byte(*queryJson), &query) + var err = json.Unmarshal([]byte(*queryJSON), &query) if err != nil { log.Fatal("Unmarshal query json failed") diff --git a/internal/reader/util_functions_test.go b/internal/reader/util_functions_test.go index bf193dccea..2f9bc3bf6a 100644 --- a/internal/reader/util_functions_test.go +++ b/internal/reader/util_functions_test.go @@ -18,7 +18,8 @@ func TestUtilFunctions_GetKey2Segments(t *testing.T) { conf.LoadConfig("config.yaml") d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) - ctx, _ := context.WithDeadline(context.Background(), d) + ctx, cancel := context.WithDeadline(context.Background(), d) + defer cancel() mc := msgclient.ReaderMessageClient{} pulsarAddr := "pulsar://" @@ -65,7 +66,7 @@ func TestUtilFunctions_GetCollectionByID(t *testing.T) { assert.Equal(t, collection.CollectionName, "collection0") assert.Equal(t, partition.PartitionName, "partition0") - assert.Equal(t, segment.SegmentId, int64(0)) + assert.Equal(t, segment.SegmentID, int64(0)) assert.Equal(t, len(node.SegmentsMap), 1) c := node.GetCollectionByID(int64(0)) @@ -112,7 +113,7 @@ func TestUtilFunctions_GetSegmentBySegmentID(t *testing.T) { // 2. Get segment by segment id var s0, err = node.GetSegmentBySegmentID(0) assert.NoError(t, err) - assert.Equal(t, s0.SegmentId, int64(0)) + assert.Equal(t, s0.SegmentID, int64(0)) node.Close() } @@ -129,7 +130,7 @@ func TestUtilFunctions_FoundSegmentBySegmentID(t *testing.T) { assert.Equal(t, collection.CollectionName, "collection0") assert.Equal(t, partition.PartitionName, "partition0") - assert.Equal(t, segment.SegmentId, int64(0)) + assert.Equal(t, segment.SegmentID, int64(0)) assert.Equal(t, len(node.SegmentsMap), 1) b1 := node.FoundSegmentBySegmentID(int64(0)) @@ -168,7 +169,8 @@ func TestUtilFunctions_GetPartitionByName(t *testing.T) { func TestUtilFunctions_PrepareBatchMsg(t *testing.T) { conf.LoadConfig("config.yaml") - ctx, _ := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() mc := msgclient.ReaderMessageClient{} pulsarAddr := "pulsar://" @@ -189,8 +191,8 @@ func TestUtilFunctions_QueryJson2Info(t *testing.T) { ctx := context.Background() node := NewQueryNode(ctx, 0, 0) - var queryJson = "{\"field_name\":\"age\",\"num_queries\":1,\"topK\":10}" - info := node.QueryJson2Info(&queryJson) + var queryJSON = "{\"field_name\":\"age\",\"num_queries\":1,\"topK\":10}" + info := node.QueryJSON2Info(&queryJSON) assert.Equal(t, info.FieldName, "age") assert.Equal(t, info.NumQueries, int64(1))