Fix Integration test for bulk insert (#24018)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/24020/head
congqixia 2023-05-10 19:09:20 +08:00 committed by GitHub
parent 5aa9db0d38
commit 0caa94e91a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 48 additions and 6 deletions

View File

@ -18,7 +18,6 @@ package integration
import (
"context"
"fmt"
"strconv"
"testing"
"time"
@ -65,10 +64,14 @@ func TestBulkInsert(t *testing.T) {
prefix := "TestBulkInsert"
dbName := ""
collectionName := prefix + funcutil.GenRandomStr()
floatVecField := floatVecField
//floatVecField := floatVecField
dim := 128
schema := constructSchema(collectionName, dim, true)
schema := constructSchema(collectionName, dim, true,
&schemapb.FieldSchema{Name: "id", DataType: schemapb.DataType_Int64, IsPrimaryKey: true, AutoID: true},
&schemapb.FieldSchema{Name: "image_path", DataType: schemapb.DataType_VarChar, TypeParams: []*commonpb.KeyValuePair{{Key: "max_length", Value: "65535"}}},
&schemapb.FieldSchema{Name: "embeddings", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "128"}}},
)
marshaledSchema, err := proto.Marshal(schema)
assert.NoError(t, err)
@ -81,6 +84,7 @@ func TestBulkInsert(t *testing.T) {
assert.NoError(t, err)
if createCollectionStatus.GetErrorCode() != commonpb.ErrorCode_Success {
log.Warn("createCollectionStatus fail reason", zap.String("reason", createCollectionStatus.GetReason()))
t.FailNow()
}
assert.Equal(t, createCollectionStatus.GetErrorCode(), commonpb.ErrorCode_Success)
@ -157,7 +161,7 @@ func TestBulkInsert(t *testing.T) {
// create index
createIndexStatus, err := c.proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
CollectionName: collectionName,
FieldName: floatVecField,
FieldName: "embeddings",
IndexName: "_default",
ExtraParams: constructIndexParam(dim, IndexHNSW, distance.L2),
})
@ -167,6 +171,8 @@ func TestBulkInsert(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, createIndexStatus.GetErrorCode())
waitingForIndexBuilt(ctx, c, t, collectionName, "embeddings")
// load
loadStatus, err := c.proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
DbName: dbName,
@ -180,14 +186,14 @@ func TestBulkInsert(t *testing.T) {
waitingForLoad(ctx, c, collectionName)
// search
expr := fmt.Sprintf("%s > 0", int64Field)
expr := "" //fmt.Sprintf("%s > 0", int64Field)
nq := 10
topk := 10
roundDecimal := -1
params := getSearchParams(IndexHNSW, distance.L2)
searchReq := constructSearchRequest("", collectionName, expr,
floatVecField, schemapb.DataType_FloatVector, nil, distance.L2, params, nq, dim, topk, roundDecimal)
"embeddings", schemapb.DataType_FloatVector, nil, distance.L2, params, nq, dim, topk, roundDecimal)
searchResult, err := c.proxy.Search(ctx, searchReq)

View File

@ -17,10 +17,14 @@
package integration
import (
"context"
"fmt"
"strconv"
"testing"
"time"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
)
@ -38,6 +42,38 @@ const (
IndexDISKANN = indexparamcheck.IndexDISKANN
)
func waitingForIndexBuilt(ctx context.Context, cluster *MiniCluster, t *testing.T, collection, field string) {
getIndexBuilt := func() bool {
resp, err := cluster.proxy.DescribeIndex(ctx, &milvuspb.DescribeIndexRequest{
CollectionName: collection,
FieldName: field,
})
if err != nil {
t.FailNow()
return true
}
for _, desc := range resp.GetIndexDescriptions() {
if desc.GetFieldName() == field {
switch desc.GetState() {
case commonpb.IndexState_Finished:
return true
case commonpb.IndexState_Failed:
return false
}
}
}
return false
}
for !getIndexBuilt() {
select {
case <-ctx.Done():
t.FailNow()
return
case <-time.After(500 * time.Millisecond):
}
}
}
func constructIndexParam(dim int, indexType string, metricType string) []*commonpb.KeyValuePair {
params := []*commonpb.KeyValuePair{
{