From 8216478b3d82d43291fc526a0c8737b98e8fca05 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Fri, 10 Sep 2021 09:33:43 +0800 Subject: [PATCH] Add replica and insertNode unittests for query node (#7638) Signed-off-by: bigsheeper --- internal/querynode/collection_replica_test.go | 17 +++ internal/querynode/flow_graph_insert_node.go | 4 - .../querynode/flow_graph_insert_node_test.go | 122 ++++++++++++++++++ 3 files changed, 139 insertions(+), 4 deletions(-) create mode 100644 internal/querynode/flow_graph_insert_node_test.go diff --git a/internal/querynode/collection_replica_test.go b/internal/querynode/collection_replica_test.go index 0e6e30561c..95ffad84a8 100644 --- a/internal/querynode/collection_replica_test.go +++ b/internal/querynode/collection_replica_test.go @@ -286,3 +286,20 @@ func TestCollectionReplica_freeAll(t *testing.T) { // err = node.Stop() // assert.NoError(t, err) //} + +func TestCollectionReplica_statistic(t *testing.T) { + t.Run("test getCollectionIDs", func(t *testing.T) { + replica, err := genSimpleReplica() + assert.NoError(t, err) + ids := replica.getCollectionIDs() + assert.Len(t, ids, 1) + assert.Equal(t, defaultCollectionID, ids[0]) + }) + + t.Run("test getCollectionIDs", func(t *testing.T) { + replica, err := genSimpleReplica() + assert.NoError(t, err) + num := replica.getSegmentNum() + assert.Equal(t, 0, num) + }) +} diff --git a/internal/querynode/flow_graph_insert_node.go b/internal/querynode/flow_graph_insert_node.go index ea8dacbfc8..394a7ac210 100644 --- a/internal/querynode/flow_graph_insert_node.go +++ b/internal/querynode/flow_graph_insert_node.go @@ -137,10 +137,6 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *sync.WaitGroup) { log.Debug("QueryNode::iNode::insert", zap.Any("SegmentID", segmentID)) var targetSegment, err = iNode.replica.getSegmentByID(segmentID) - if targetSegment.segmentType != segmentTypeGrowing { - wg.Done() - return - } if err != nil { log.Warn("cannot find segment:", zap.Int64("segmentID", segmentID)) // TODO: add error handling diff --git a/internal/querynode/flow_graph_insert_node_test.go b/internal/querynode/flow_graph_insert_node_test.go new file mode 100644 index 0000000000..5a5112dbf0 --- /dev/null +++ b/internal/querynode/flow_graph_insert_node_test.go @@ -0,0 +1,122 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package querynode + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/internal/msgstream" + "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/util/flowgraph" +) + +func genFlowGraphInsertData() (*InsertData, error) { + insertMsg, err := genSimpleInsertMsg() + if err != nil { + return nil, err + } + + insertData := &InsertData{ + insertIDs: map[UniqueID][]UniqueID{ + defaultSegmentID: insertMsg.RowIDs, + }, + insertTimestamps: map[UniqueID][]Timestamp{ + defaultSegmentID: insertMsg.Timestamps, + }, + insertRecords: map[UniqueID][]*commonpb.Blob{ + defaultSegmentID: insertMsg.RowData, + }, + insertOffset: map[UniqueID]int64{ + defaultSegmentID: 0, + }, + } + return insertData, nil +} + +func TestFlowGraphInsertNode_insert(t *testing.T) { + t.Run("test insert", func(t *testing.T) { + replica, err := genSimpleReplica() + assert.NoError(t, err) + insertNode := newInsertNode(replica) + + err = replica.addSegment(defaultSegmentID, + defaultPartitionID, + defaultCollectionID, + defaultVChannel, + segmentTypeGrowing, + true) + assert.NoError(t, err) + + insertData, err := genFlowGraphInsertData() + assert.NoError(t, err) + + wg := &sync.WaitGroup{} + wg.Add(1) + insertNode.insert(insertData, defaultSegmentID, wg) + }) + + t.Run("test no target segment", func(t *testing.T) { + replica, err := genSimpleReplica() + assert.NoError(t, err) + insertNode := newInsertNode(replica) + wg := &sync.WaitGroup{} + wg.Add(1) + insertNode.insert(nil, defaultSegmentID, wg) + }) + + t.Run("test invalid segmentType", func(t *testing.T) { + replica, err := genSimpleReplica() + assert.NoError(t, err) + insertNode := newInsertNode(replica) + + err = replica.addSegment(defaultSegmentID, + defaultPartitionID, + defaultCollectionID, + defaultVChannel, + segmentTypeSealed, + true) + assert.NoError(t, err) + + wg := &sync.WaitGroup{} + wg.Add(1) + insertNode.insert(nil, defaultSegmentID, wg) + }) +} + +func TestFlowGraphInsertNode_operate(t *testing.T) { + t.Run("test operate", func(t *testing.T) { + replica, err := genSimpleReplica() + assert.NoError(t, err) + insertNode := newInsertNode(replica) + + err = replica.addSegment(defaultSegmentID, + defaultPartitionID, + defaultCollectionID, + defaultVChannel, + segmentTypeGrowing, + true) + assert.NoError(t, err) + + msgInsertMsg, err := genSimpleInsertMsg() + assert.NoError(t, err) + iMsg := insertMsg{ + insertMessages: []*msgstream.InsertMsg{ + msgInsertMsg, + }, + } + msg := []flowgraph.Msg{&iMsg} + insertNode.Operate(msg) + }) +}