Add loader and streaming unittests for query node (#7762)

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/7766/head
bigsheeper 2021-09-11 17:56:01 +08:00 committed by GitHub
parent 88358f409b
commit ab7ded2202
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 394 additions and 49 deletions

View File

@ -511,20 +511,31 @@ func genKey(collectionID, partitionID, segmentID UniqueID, fieldID int64) string
return path.Join(ids...)
}
func genSimpleStorageBlob() ([]*storage.Blob, error) {
collMeta := genSimpleCollectionMeta()
func genStorageBlob(collectionID UniqueID,
partitionID UniqueID,
segmentID UniqueID,
msgLength int,
schema *schemapb.CollectionSchema) ([]*storage.Blob, error) {
collMeta := genCollectionMeta(collectionID, schema)
inCodec := storage.NewInsertCodec(collMeta)
insertData, err := genSimpleInsertData()
insertData, err := genInsertData(msgLength, schema)
if err != nil {
return nil, err
}
// timestamp field not allowed 0 timestamp
insertData.Data[timestampFieldID].(*storage.Int64FieldData).Data[0] = 1
binLogs, _, err := inCodec.Serialize(defaultPartitionID, defaultSegmentID, insertData)
if _, ok := insertData.Data[timestampFieldID]; ok {
insertData.Data[timestampFieldID].(*storage.Int64FieldData).Data[0] = 1
}
binLogs, _, err := inCodec.Serialize(partitionID, segmentID, insertData)
return binLogs, err
}
func genSimpleStorageBlob() ([]*storage.Blob, error) {
schema, _ := genSimpleSchema()
return genStorageBlob(defaultCollectionID, defaultPartitionID, defaultSegmentID, defaultMsgLength, schema)
}
func genSimpleFloatVectors() []float32 {
vec := make([]float32, defaultDim)
for i := 0; i < defaultDim; i++ {
@ -587,8 +598,17 @@ func genSimpleCommonBlob() ([]*commonpb.Blob, error) {
return genCommonBlob(defaultMsgLength, schema)
}
func saveSimpleBinLog(ctx context.Context) ([]*datapb.FieldBinlog, error) {
binLogs, err := genSimpleStorageBlob()
func saveBinLog(ctx context.Context,
collectionID UniqueID,
partitionID UniqueID,
segmentID UniqueID,
msgLength int,
schema *schemapb.CollectionSchema) ([]*datapb.FieldBinlog, error) {
binLogs, err := genStorageBlob(collectionID,
partitionID,
segmentID,
msgLength,
schema)
if err != nil {
return nil, err
}
@ -605,7 +625,7 @@ func saveSimpleBinLog(ctx context.Context) ([]*datapb.FieldBinlog, error) {
return nil, err
}
key := genKey(defaultCollectionID, defaultPartitionID, defaultSegmentID, fieldID)
key := genKey(collectionID, partitionID, segmentID, fieldID)
kvs[key] = string(blob.Value[:])
fieldBinlog = append(fieldBinlog, &datapb.FieldBinlog{
FieldID: fieldID,
@ -622,6 +642,11 @@ func saveSimpleBinLog(ctx context.Context) ([]*datapb.FieldBinlog, error) {
return fieldBinlog, err
}
func saveSimpleBinLog(ctx context.Context) ([]*datapb.FieldBinlog, error) {
schema, _ := genSimpleSchema()
return saveBinLog(ctx, defaultCollectionID, defaultPartitionID, defaultSegmentID, defaultMsgLength, schema)
}
func genSimpleTimestampFieldData() []Timestamp {
times := make([]Timestamp, defaultMsgLength)
for i := 0; i < defaultMsgLength; i++ {

View File

@ -13,13 +13,16 @@ package querynode
import (
"context"
"fmt"
"math/rand"
"testing"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
@ -37,33 +40,72 @@ func TestSegmentLoader_loadSegment(t *testing.T) {
kv, err := genEtcdKV()
assert.NoError(t, err)
loader := newSegmentLoader(ctx, nil, nil, historical.replica, kv)
assert.NotNil(t, loader)
schema, _ := genSimpleSchema()
fieldBinlog, err := saveSimpleBinLog(ctx)
assert.NoError(t, err)
req := &querypb.LoadSegmentsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_WatchQueryChannels,
MsgID: rand.Int63(),
},
NodeID: 0,
Schema: schema,
LoadCondition: querypb.TriggerCondition_grpcRequest,
Infos: []*querypb.SegmentLoadInfo{
{
SegmentID: defaultSegmentID,
PartitionID: defaultPartitionID,
CollectionID: defaultCollectionID,
BinlogPaths: fieldBinlog,
t.Run("test no segment meta", func(t *testing.T) {
loader := newSegmentLoader(ctx, nil, nil, historical.replica, kv)
assert.NotNil(t, loader)
req := &querypb.LoadSegmentsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_WatchQueryChannels,
MsgID: rand.Int63(),
},
},
}
err = loader.loadSegment(req, true)
assert.Error(t, err)
NodeID: 0,
Schema: schema,
LoadCondition: querypb.TriggerCondition_grpcRequest,
Infos: []*querypb.SegmentLoadInfo{
{
SegmentID: defaultSegmentID,
PartitionID: defaultPartitionID,
CollectionID: defaultCollectionID,
BinlogPaths: fieldBinlog,
},
},
}
key := fmt.Sprintf("%s/%d", queryCoordSegmentMetaPrefix, defaultSegmentID)
err = kv.Remove(key)
assert.NoError(t, err)
err = loader.loadSegment(req, true)
assert.Error(t, err)
})
t.Run("test load segment", func(t *testing.T) {
loader := newSegmentLoader(ctx, nil, nil, historical.replica, kv)
assert.NotNil(t, loader)
req := &querypb.LoadSegmentsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_WatchQueryChannels,
MsgID: rand.Int63(),
},
NodeID: 0,
Schema: schema,
LoadCondition: querypb.TriggerCondition_grpcRequest,
Infos: []*querypb.SegmentLoadInfo{
{
SegmentID: defaultSegmentID,
PartitionID: defaultPartitionID,
CollectionID: defaultCollectionID,
BinlogPaths: fieldBinlog,
},
},
}
key := fmt.Sprintf("%s/%d", queryCoordSegmentMetaPrefix, defaultSegmentID)
segmentInfo := &querypb.SegmentInfo{}
value := proto.MarshalTextString(segmentInfo)
err = kv.Save(key, value)
assert.NoError(t, err)
err = loader.loadSegment(req, true)
assert.NoError(t, err)
})
}
func TestSegmentLoader_notOnService(t *testing.T) {
@ -161,3 +203,199 @@ func TestSegmentLoader_CheckSegmentMemory(t *testing.T) {
assert.Error(t, err)
})
}
func TestSegmentLoader_loadSegmentFieldsData(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
runLoadSegmentFieldData := func(dataType schemapb.DataType) {
historical, err := genSimpleHistorical(ctx)
assert.NoError(t, err)
fieldUID := genConstantField(uidField)
fieldTimestamp := genConstantField(timestampField)
schema := &schemapb.CollectionSchema{
Name: defaultCollectionName,
AutoID: true,
Fields: []*schemapb.FieldSchema{},
}
constFieldID := FieldID(105)
constFieldName := "const-field-test"
constField := &schemapb.FieldSchema{
FieldID: constFieldID,
Name: constFieldName,
}
switch dataType {
case schemapb.DataType_Bool:
constField.DataType = schemapb.DataType_Bool
case schemapb.DataType_Int8:
constField.DataType = schemapb.DataType_Int8
case schemapb.DataType_Int16:
constField.DataType = schemapb.DataType_Int16
case schemapb.DataType_Int32:
constField.DataType = schemapb.DataType_Int32
case schemapb.DataType_Int64:
constField.DataType = schemapb.DataType_Int64
case schemapb.DataType_Float:
constField.DataType = schemapb.DataType_Float
case schemapb.DataType_Double:
constField.DataType = schemapb.DataType_Double
case schemapb.DataType_FloatVector:
constField.DataType = schemapb.DataType_FloatVector
case schemapb.DataType_BinaryVector:
constField.DataType = schemapb.DataType_BinaryVector
}
schema.Fields = append(schema.Fields, constField)
err = historical.replica.removeSegment(defaultSegmentID)
assert.NoError(t, err)
col := newCollection(defaultCollectionID, schema)
assert.NotNil(t, col)
segment := newSegment(col,
defaultSegmentID,
defaultPartitionID,
defaultCollectionID,
defaultVChannel,
segmentTypeSealed,
true)
schema.Fields = append(schema.Fields, fieldUID)
schema.Fields = append(schema.Fields, fieldTimestamp)
binlog, err := saveBinLog(ctx, defaultCollectionID, defaultPartitionID, defaultSegmentID, defaultMsgLength, schema)
assert.NoError(t, err)
err = historical.loader.loadSegmentFieldsData(segment, binlog)
assert.NoError(t, err)
}
t.Run("test bool", func(t *testing.T) {
runLoadSegmentFieldData(schemapb.DataType_Bool)
runLoadSegmentFieldData(schemapb.DataType_Int8)
runLoadSegmentFieldData(schemapb.DataType_Int16)
runLoadSegmentFieldData(schemapb.DataType_Int32)
runLoadSegmentFieldData(schemapb.DataType_Int64)
runLoadSegmentFieldData(schemapb.DataType_Float)
runLoadSegmentFieldData(schemapb.DataType_Double)
})
}
func TestSegmentLoader_invalid(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
t.Run("test loadSegmentOfConditionHandOff", func(t *testing.T) {
historical, err := genSimpleHistorical(ctx)
assert.NoError(t, err)
err = historical.loader.loadSegmentOfConditionHandOff(nil)
assert.Error(t, err)
})
t.Run("test no collection", func(t *testing.T) {
historical, err := genSimpleHistorical(ctx)
assert.NoError(t, err)
err = historical.replica.removeCollection(defaultCollectionID)
assert.NoError(t, err)
req := &querypb.LoadSegmentsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_WatchQueryChannels,
MsgID: rand.Int63(),
},
NodeID: 0,
LoadCondition: querypb.TriggerCondition_grpcRequest,
Infos: []*querypb.SegmentLoadInfo{
{
SegmentID: defaultSegmentID,
PartitionID: defaultPartitionID,
CollectionID: defaultCollectionID,
},
},
}
err = historical.loader.loadSegment(req, true)
assert.Error(t, err)
})
t.Run("test no collection 2", func(t *testing.T) {
historical, err := genSimpleHistorical(ctx)
assert.NoError(t, err)
err = historical.replica.removeCollection(defaultCollectionID)
assert.NoError(t, err)
err = historical.loader.loadSegmentInternal(defaultCollectionID, nil, nil)
assert.Error(t, err)
})
t.Run("test no vec field", func(t *testing.T) {
historical, err := genSimpleHistorical(ctx)
assert.NoError(t, err)
err = historical.replica.removeCollection(defaultCollectionID)
assert.NoError(t, err)
schema := &schemapb.CollectionSchema{
Name: defaultCollectionName,
AutoID: true,
Fields: []*schemapb.FieldSchema{
genConstantField(constFieldParam{
id: FieldID(100),
dataType: schemapb.DataType_Int8,
}),
},
}
err = historical.loader.historicalReplica.addCollection(defaultCollectionID, schema)
assert.NoError(t, err)
err = historical.loader.loadSegmentInternal(defaultCollectionID, nil, nil)
assert.Error(t, err)
})
t.Run("test no vec field 2", func(t *testing.T) {
historical, err := genSimpleHistorical(ctx)
assert.NoError(t, err)
err = historical.replica.removeCollection(defaultCollectionID)
assert.NoError(t, err)
schema := &schemapb.CollectionSchema{
Name: defaultCollectionName,
AutoID: true,
Fields: []*schemapb.FieldSchema{
genConstantField(constFieldParam{
id: FieldID(100),
dataType: schemapb.DataType_Int8,
}),
},
}
err = historical.loader.historicalReplica.addCollection(defaultCollectionID, schema)
assert.NoError(t, err)
req := &querypb.LoadSegmentsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_WatchQueryChannels,
MsgID: rand.Int63(),
},
NodeID: 0,
Schema: schema,
LoadCondition: querypb.TriggerCondition_grpcRequest,
Infos: []*querypb.SegmentLoadInfo{
{
SegmentID: defaultSegmentID,
PartitionID: defaultPartitionID,
CollectionID: defaultCollectionID,
},
},
}
err = historical.loader.loadSegment(req, false)
assert.Error(t, err)
})
}

View File

@ -33,21 +33,90 @@ func TestStreaming_search(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
streaming, err := genSimpleStreaming(ctx)
assert.NoError(t, err)
defer streaming.close()
t.Run("test search", func(t *testing.T) {
streaming, err := genSimpleStreaming(ctx)
assert.NoError(t, err)
defer streaming.close()
plan, searchReqs, err := genSimpleSearchPlanAndRequests()
assert.NoError(t, err)
plan, searchReqs, err := genSimpleSearchPlanAndRequests()
assert.NoError(t, err)
res, err := streaming.search(searchReqs,
defaultCollectionID,
[]UniqueID{defaultPartitionID},
defaultVChannel,
plan,
Timestamp(0))
assert.NoError(t, err)
assert.Len(t, res, 1)
res, err := streaming.search(searchReqs,
defaultCollectionID,
[]UniqueID{defaultPartitionID},
defaultVChannel,
plan,
Timestamp(0))
assert.NoError(t, err)
assert.Len(t, res, 1)
})
t.Run("test run empty partition", func(t *testing.T) {
streaming, err := genSimpleStreaming(ctx)
assert.NoError(t, err)
defer streaming.close()
plan, searchReqs, err := genSimpleSearchPlanAndRequests()
assert.NoError(t, err)
res, err := streaming.search(searchReqs,
defaultCollectionID,
[]UniqueID{},
defaultVChannel,
plan,
Timestamp(0))
assert.NoError(t, err)
assert.Len(t, res, 1)
})
t.Run("test run empty partition and loadCollection", func(t *testing.T) {
streaming, err := genSimpleStreaming(ctx)
assert.NoError(t, err)
defer streaming.close()
plan, searchReqs, err := genSimpleSearchPlanAndRequests()
assert.NoError(t, err)
col, err := streaming.replica.getCollectionByID(defaultCollectionID)
assert.NoError(t, err)
col.setLoadType(loadTypeCollection)
err = streaming.replica.removePartition(defaultPartitionID)
assert.NoError(t, err)
res, err := streaming.search(searchReqs,
defaultCollectionID,
[]UniqueID{defaultPartitionID},
defaultVChannel,
plan,
Timestamp(0))
assert.NoError(t, err)
assert.Nil(t, res)
})
t.Run("test run empty partition and loadPartition", func(t *testing.T) {
streaming, err := genSimpleStreaming(ctx)
assert.NoError(t, err)
defer streaming.close()
plan, searchReqs, err := genSimpleSearchPlanAndRequests()
assert.NoError(t, err)
col, err := streaming.replica.getCollectionByID(defaultCollectionID)
assert.NoError(t, err)
col.setLoadType(loadTypePartition)
err = streaming.replica.removePartition(defaultPartitionID)
assert.NoError(t, err)
_, err = streaming.search(searchReqs,
defaultCollectionID,
[]UniqueID{defaultPartitionID},
defaultVChannel,
plan,
Timestamp(0))
assert.Error(t, err)
})
}
func TestStreaming_retrieve(t *testing.T) {
@ -73,11 +142,24 @@ func TestStreaming_retrieve(t *testing.T) {
err = segment.segmentInsert(offset, &insertMsg.RowIDs, &insertMsg.Timestamps, &insertMsg.RowData)
assert.NoError(t, err)
res, ids, err := streaming.retrieve(defaultCollectionID, []UniqueID{defaultPartitionID}, plan)
assert.NoError(t, err)
assert.Len(t, res, 1)
assert.Len(t, ids, 1)
//assert.Error(t, err)
//assert.Len(t, res, 0)
//assert.Len(t, ids, 0)
t.Run("test retrieve", func(t *testing.T) {
res, ids, err := streaming.retrieve(defaultCollectionID,
[]UniqueID{defaultPartitionID},
plan)
assert.NoError(t, err)
assert.Len(t, res, 1)
assert.Len(t, ids, 1)
//assert.Error(t, err)
//assert.Len(t, res, 0)
//assert.Len(t, ids, 0)
})
t.Run("test empty partition", func(t *testing.T) {
res, ids, err := streaming.retrieve(defaultCollectionID,
[]UniqueID{},
plan)
assert.NoError(t, err)
assert.Len(t, res, 1)
assert.Len(t, ids, 1)
})
}