// Licensed to the LF AI & Data foundation under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package integration import ( "bytes" "context" "encoding/binary" "encoding/json" "fmt" "math/rand" "strconv" "testing" "time" "github.com/cockroachdb/errors" "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus-proto/go-api/milvuspb" "github.com/milvus-io/milvus-proto/go-api/schemapb" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/distance" "github.com/milvus-io/milvus/pkg/util/funcutil" ) func TestHelloMilvus(t *testing.T) { ctx := context.Background() c, err := StartMiniCluster(ctx) assert.NoError(t, err) err = c.Start() assert.NoError(t, err) defer c.Stop() assert.NoError(t, err) prefix := "TestHelloMilvus" dbName := "" collectionName := prefix + funcutil.GenRandomStr() int64Field := "int64" floatVecField := "fvec" dim := 128 rowNum := 3000 constructCollectionSchema := func() *schemapb.CollectionSchema { pk := &schemapb.FieldSchema{ FieldID: 0, Name: int64Field, IsPrimaryKey: true, Description: "", DataType: schemapb.DataType_Int64, TypeParams: nil, IndexParams: nil, AutoID: true, } fVec := &schemapb.FieldSchema{ FieldID: 0, Name: floatVecField, IsPrimaryKey: false, Description: "", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{ { Key: "dim", Value: strconv.Itoa(dim), }, }, IndexParams: nil, AutoID: false, } return &schemapb.CollectionSchema{ Name: collectionName, Description: "", AutoID: false, Fields: []*schemapb.FieldSchema{ pk, fVec, }, } } schema := constructCollectionSchema() marshaledSchema, err := proto.Marshal(schema) assert.NoError(t, err) createCollectionStatus, err := c.proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ DbName: dbName, CollectionName: collectionName, Schema: marshaledSchema, ShardsNum: 2, }) assert.NoError(t, err) if createCollectionStatus.GetErrorCode() != commonpb.ErrorCode_Success { log.Warn("createCollectionStatus fail reason", zap.String("reason", createCollectionStatus.GetReason())) } assert.Equal(t, createCollectionStatus.GetErrorCode(), commonpb.ErrorCode_Success) log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus)) showCollectionsResp, err := c.proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{}) assert.NoError(t, err) assert.Equal(t, showCollectionsResp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success) log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp)) fVecColumn := newFloatVectorFieldData(floatVecField, rowNum, dim) hashKeys := generateHashKeys(rowNum) insertResult, err := c.proxy.Insert(ctx, &milvuspb.InsertRequest{ DbName: dbName, CollectionName: collectionName, FieldsData: []*schemapb.FieldData{fVecColumn}, HashKeys: hashKeys, NumRows: uint32(rowNum), }) assert.NoError(t, err) assert.Equal(t, insertResult.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success) // flush flushResp, err := c.proxy.Flush(ctx, &milvuspb.FlushRequest{ DbName: dbName, CollectionNames: []string{collectionName}, }) assert.NoError(t, err) segmentIDs, has := flushResp.GetCollSegIDs()[collectionName] ids := segmentIDs.GetData() assert.NotEmpty(t, segmentIDs) segments, err := c.metaWatcher.ShowSegments() assert.NoError(t, err) assert.NotEmpty(t, segments) for _, segment := range segments { log.Info("ShowSegments result", zap.String("segment", segment.String())) } if has && len(ids) > 0 { flushed := func() bool { resp, err := c.proxy.GetFlushState(ctx, &milvuspb.GetFlushStateRequest{ SegmentIDs: ids, }) if err != nil { //panic(errors.New("GetFlushState failed")) return false } return resp.GetFlushed() } for !flushed() { // respect context deadline/cancel select { case <-ctx.Done(): panic(errors.New("deadline exceeded")) default: } time.Sleep(500 * time.Millisecond) } } // create index createIndexStatus, err := c.proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ CollectionName: collectionName, FieldName: floatVecField, IndexName: "_default", ExtraParams: []*commonpb.KeyValuePair{ { Key: "dim", Value: strconv.Itoa(dim), }, { Key: common.MetricTypeKey, Value: distance.L2, }, { Key: "index_type", Value: "IVF_FLAT", }, { Key: "nlist", Value: strconv.Itoa(10), }, }, }) if createIndexStatus.GetErrorCode() != commonpb.ErrorCode_Success { log.Warn("createIndexStatus fail reason", zap.String("reason", createIndexStatus.GetReason())) } assert.NoError(t, err) assert.Equal(t, commonpb.ErrorCode_Success, createIndexStatus.GetErrorCode()) // load loadStatus, err := c.proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ DbName: dbName, CollectionName: collectionName, }) assert.NoError(t, err) if loadStatus.GetErrorCode() != commonpb.ErrorCode_Success { log.Warn("loadStatus fail reason", zap.String("reason", loadStatus.GetReason())) } assert.Equal(t, commonpb.ErrorCode_Success, loadStatus.GetErrorCode()) for { loadProgress, err := c.proxy.GetLoadingProgress(ctx, &milvuspb.GetLoadingProgressRequest{ CollectionName: collectionName, }) if err != nil { panic("GetLoadingProgress fail") } if loadProgress.GetProgress() == 100 { break } time.Sleep(500 * time.Millisecond) } // search expr := fmt.Sprintf("%s > 0", "int64") nq := 10 topk := 10 roundDecimal := -1 nprobe := 10 searchReq := constructSearchRequest("", collectionName, expr, floatVecField, nq, dim, nprobe, topk, roundDecimal) searchResult, err := c.proxy.Search(ctx, searchReq) if searchResult.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { log.Warn("searchResult fail reason", zap.String("reason", searchResult.GetStatus().GetReason())) } assert.NoError(t, err) assert.Equal(t, commonpb.ErrorCode_Success, searchResult.GetStatus().GetErrorCode()) log.Info("TestHelloMilvus succeed") } const ( AnnsFieldKey = "anns_field" TopKKey = "topk" NQKey = "nq" MetricTypeKey = "metric_type" SearchParamsKey = "params" RoundDecimalKey = "round_decimal" OffsetKey = "offset" LimitKey = "limit" ) func constructSearchRequest( dbName, collectionName string, expr string, floatVecField string, nq, dim, nprobe, topk, roundDecimal int, ) *milvuspb.SearchRequest { params := make(map[string]string) params["nprobe"] = strconv.Itoa(nprobe) b, err := json.Marshal(params) if err != nil { panic(err) } plg := constructPlaceholderGroup(nq, dim) plgBs, err := proto.Marshal(plg) if err != nil { panic(err) } return &milvuspb.SearchRequest{ Base: nil, DbName: dbName, CollectionName: collectionName, PartitionNames: nil, Dsl: expr, PlaceholderGroup: plgBs, DslType: commonpb.DslType_BoolExprV1, OutputFields: nil, SearchParams: []*commonpb.KeyValuePair{ { Key: common.MetricTypeKey, Value: distance.L2, }, { Key: SearchParamsKey, Value: string(b), }, { Key: AnnsFieldKey, Value: floatVecField, }, { Key: TopKKey, Value: strconv.Itoa(topk), }, { Key: RoundDecimalKey, Value: strconv.Itoa(roundDecimal), }, }, TravelTimestamp: 0, GuaranteeTimestamp: 0, } } func constructPlaceholderGroup( nq, dim int, ) *commonpb.PlaceholderGroup { values := make([][]byte, 0, nq) for i := 0; i < nq; i++ { bs := make([]byte, 0, dim*4) for j := 0; j < dim; j++ { var buffer bytes.Buffer f := rand.Float32() err := binary.Write(&buffer, common.Endian, f) if err != nil { panic(err) } bs = append(bs, buffer.Bytes()...) } values = append(values, bs) } return &commonpb.PlaceholderGroup{ Placeholders: []*commonpb.PlaceholderValue{ { Tag: "$0", Type: commonpb.PlaceholderType_FloatVector, Values: values, }, }, } } func newFloatVectorFieldData(fieldName string, numRows, dim int) *schemapb.FieldData { return &schemapb.FieldData{ Type: schemapb.DataType_FloatVector, FieldName: fieldName, Field: &schemapb.FieldData_Vectors{ Vectors: &schemapb.VectorField{ Dim: int64(dim), Data: &schemapb.VectorField_FloatVector{ FloatVector: &schemapb.FloatArray{ Data: generateFloatVectors(numRows, dim), }, }, }, }, } } func generateFloatVectors(numRows, dim int) []float32 { total := numRows * dim ret := make([]float32, 0, total) for i := 0; i < total; i++ { ret = append(ret, rand.Float32()) } return ret } func generateHashKeys(numRows int) []uint32 { ret := make([]uint32, 0, numRows) for i := 0; i < numRows; i++ { ret = append(ret, rand.Uint32()) } return ret }