Seperate integration test package to resolve interference between cases (#24331)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/24336/head
congqixia 2023-05-23 17:51:25 +08:00 committed by GitHub
parent c58bebed6d
commit 52191fe3e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 620 additions and 515 deletions

View File

@ -30,9 +30,10 @@ if [[ $(uname -s) == "Darwin" && "$(uname -m)" == "arm64" ]]; then
APPLE_SILICON_FLAG="-tags dynamic"
fi
pushd tests/integration
go test -race ${APPLE_SILICON_FLAG} -v
popd
for d in $(go list ./tests/integration/...); do
echo "$d"
go test -race ${APPLE_SILICON_FLAG} -v "$d"
done
endTime=`date +%s`

View File

@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package integration
package bulkinsert
import (
"context"
@ -38,6 +38,7 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/distance"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/tests/integration"
)
const (
@ -46,7 +47,7 @@ const (
)
type BulkInsertSuite struct {
MiniClusterSuite
integration.MiniClusterSuite
}
// test bulk insert E2E
@ -67,7 +68,7 @@ func (s *BulkInsertSuite) TestBulkInsert() {
//floatVecField := floatVecField
dim := 128
schema := constructSchema(collectionName, dim, true,
schema := integration.ConstructSchema(collectionName, dim, true,
&schemapb.FieldSchema{Name: "id", DataType: schemapb.DataType_Int64, IsPrimaryKey: true, AutoID: true},
&schemapb.FieldSchema{Name: "image_path", DataType: schemapb.DataType_VarChar, TypeParams: []*commonpb.KeyValuePair{{Key: common.MaxLengthKey, Value: "65535"}}},
&schemapb.FieldSchema{Name: "embeddings", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "128"}}},
@ -75,7 +76,7 @@ func (s *BulkInsertSuite) TestBulkInsert() {
marshaledSchema, err := proto.Marshal(schema)
s.NoError(err)
createCollectionStatus, err := c.proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
DbName: dbName,
CollectionName: collectionName,
Schema: marshaledSchema,
@ -89,19 +90,19 @@ func (s *BulkInsertSuite) TestBulkInsert() {
s.Equal(createCollectionStatus.GetErrorCode(), commonpb.ErrorCode_Success)
log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus))
showCollectionsResp, err := c.proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{})
showCollectionsResp, err := c.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{})
s.NoError(err)
s.Equal(showCollectionsResp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success)
log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp))
err = GenerateNumpyFile(c.chunkManager.RootPath()+"/"+"embeddings.npy", 100, schemapb.DataType_FloatVector, []*commonpb.KeyValuePair{
err = GenerateNumpyFile(c.ChunkManager.RootPath()+"/"+"embeddings.npy", 100, schemapb.DataType_FloatVector, []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: strconv.Itoa(Dim),
},
})
s.NoError(err)
err = GenerateNumpyFile(c.chunkManager.RootPath()+"/"+"image_path.npy", 100, schemapb.DataType_VarChar, []*commonpb.KeyValuePair{
err = GenerateNumpyFile(c.ChunkManager.RootPath()+"/"+"image_path.npy", 100, schemapb.DataType_VarChar, []*commonpb.KeyValuePair{
{
Key: common.MaxLengthKey,
Value: strconv.Itoa(65535),
@ -110,14 +111,14 @@ func (s *BulkInsertSuite) TestBulkInsert() {
s.NoError(err)
bulkInsertFiles := []string{
c.chunkManager.RootPath() + "/" + "embeddings.npy",
c.chunkManager.RootPath() + "/" + "image_path.npy",
c.ChunkManager.RootPath() + "/" + "embeddings.npy",
c.ChunkManager.RootPath() + "/" + "image_path.npy",
}
health1, err := c.dataCoord.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
health1, err := c.DataCoord.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
s.NoError(err)
log.Info("dataCoord health", zap.Any("health1", health1))
importResp, err := c.proxy.Import(ctx, &milvuspb.ImportRequest{
importResp, err := c.Proxy.Import(ctx, &milvuspb.ImportRequest{
CollectionName: collectionName,
Files: bulkInsertFiles,
})
@ -128,7 +129,7 @@ func (s *BulkInsertSuite) TestBulkInsert() {
for _, task := range tasks {
loop:
for {
importTaskState, err := c.proxy.GetImportState(ctx, &milvuspb.GetImportStateRequest{
importTaskState, err := c.Proxy.GetImportState(ctx, &milvuspb.GetImportStateRequest{
Task: task,
})
s.NoError(err)
@ -147,11 +148,11 @@ func (s *BulkInsertSuite) TestBulkInsert() {
}
}
health2, err := c.dataCoord.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
health2, err := c.DataCoord.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
s.NoError(err)
log.Info("dataCoord health", zap.Any("health2", health2))
segments, err := c.metaWatcher.ShowSegments()
segments, err := c.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
for _, segment := range segments {
@ -159,11 +160,11 @@ func (s *BulkInsertSuite) TestBulkInsert() {
}
// create index
createIndexStatus, err := c.proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
CollectionName: collectionName,
FieldName: "embeddings",
IndexName: "_default",
ExtraParams: constructIndexParam(dim, IndexHNSW, distance.L2),
ExtraParams: integration.ConstructIndexParam(dim, integration.IndexHNSW, distance.L2),
})
if createIndexStatus.GetErrorCode() != commonpb.ErrorCode_Success {
log.Warn("createIndexStatus fail reason", zap.String("reason", createIndexStatus.GetReason()))
@ -171,10 +172,10 @@ func (s *BulkInsertSuite) TestBulkInsert() {
s.NoError(err)
s.Equal(commonpb.ErrorCode_Success, createIndexStatus.GetErrorCode())
waitingForIndexBuilt(ctx, c, s.T(), collectionName, "embeddings")
s.WaitForIndexBuilt(ctx, collectionName, "embeddings")
// load
loadStatus, err := c.proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
DbName: dbName,
CollectionName: collectionName,
})
@ -183,7 +184,7 @@ func (s *BulkInsertSuite) TestBulkInsert() {
log.Warn("loadStatus fail reason", zap.String("reason", loadStatus.GetReason()))
}
s.Equal(commonpb.ErrorCode_Success, loadStatus.GetErrorCode())
waitingForLoad(ctx, c, collectionName)
s.WaitForLoad(ctx, collectionName)
// search
expr := "" //fmt.Sprintf("%s > 0", int64Field)
@ -191,11 +192,11 @@ func (s *BulkInsertSuite) TestBulkInsert() {
topk := 10
roundDecimal := -1
params := getSearchParams(IndexHNSW, distance.L2)
searchReq := constructSearchRequest("", collectionName, expr,
params := integration.GetSearchParams(integration.IndexHNSW, distance.L2)
searchReq := integration.ConstructSearchRequest("", collectionName, expr,
"embeddings", schemapb.DataType_FloatVector, nil, distance.L2, params, nq, dim, topk, roundDecimal)
searchResult, err := c.proxy.Search(ctx, searchReq)
searchResult, err := c.Proxy.Search(ctx, searchReq)
if searchResult.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
log.Warn("searchResult fail reason", zap.String("reason", searchResult.GetStatus().GetReason()))

View File

@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package integration
package getvector
import (
"context"
@ -32,10 +32,11 @@ import (
"github.com/milvus-io/milvus/pkg/util/distance"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
"github.com/milvus-io/milvus/tests/integration"
)
type TestGetVectorSuite struct {
MiniClusterSuite
integration.MiniClusterSuite
// test params
nq int
@ -47,7 +48,7 @@ type TestGetVectorSuite struct {
}
func (s *TestGetVectorSuite) run() {
ctx, cancel := context.WithCancel(s.Cluster.ctx)
ctx, cancel := context.WithCancel(s.Cluster.GetContext())
defer cancel()
collection := fmt.Sprintf("TestGetVector_%d_%d_%s_%s_%s",
@ -89,11 +90,11 @@ func (s *TestGetVectorSuite) run() {
},
IndexParams: nil,
}
schema := constructSchema(collection, dim, false, pk, fVec)
schema := integration.ConstructSchema(collection, dim, false, pk, fVec)
marshaledSchema, err := proto.Marshal(schema)
s.Require().NoError(err)
createCollectionStatus, err := s.Cluster.proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
createCollectionStatus, err := s.Cluster.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
CollectionName: collection,
Schema: marshaledSchema,
ShardsNum: 2,
@ -103,19 +104,19 @@ func (s *TestGetVectorSuite) run() {
fieldsData := make([]*schemapb.FieldData, 0)
if s.pkType == schemapb.DataType_Int64 {
fieldsData = append(fieldsData, newInt64FieldData(pkFieldName, NB))
fieldsData = append(fieldsData, integration.NewInt64FieldData(pkFieldName, NB))
} else {
fieldsData = append(fieldsData, newStringFieldData(pkFieldName, NB))
fieldsData = append(fieldsData, integration.NewStringFieldData(pkFieldName, NB))
}
var vecFieldData *schemapb.FieldData
if s.vecType == schemapb.DataType_FloatVector {
vecFieldData = newFloatVectorFieldData(vecFieldName, NB, dim)
vecFieldData = integration.NewFloatVectorFieldData(vecFieldName, NB, dim)
} else {
vecFieldData = newBinaryVectorFieldData(vecFieldName, NB, dim)
vecFieldData = integration.NewBinaryVectorFieldData(vecFieldName, NB, dim)
}
fieldsData = append(fieldsData, vecFieldData)
hashKeys := generateHashKeys(NB)
_, err = s.Cluster.proxy.Insert(ctx, &milvuspb.InsertRequest{
hashKeys := integration.GenerateHashKeys(NB)
_, err = s.Cluster.Proxy.Insert(ctx, &milvuspb.InsertRequest{
CollectionName: collection,
FieldsData: fieldsData,
HashKeys: hashKeys,
@ -125,7 +126,7 @@ func (s *TestGetVectorSuite) run() {
s.Require().Equal(createCollectionStatus.GetErrorCode(), commonpb.ErrorCode_Success)
// flush
flushResp, err := s.Cluster.proxy.Flush(ctx, &milvuspb.FlushRequest{
flushResp, err := s.Cluster.Proxy.Flush(ctx, &milvuspb.FlushRequest{
CollectionNames: []string{collection},
})
s.Require().NoError(err)
@ -134,42 +135,42 @@ func (s *TestGetVectorSuite) run() {
s.Require().NotEmpty(segmentIDs)
s.Require().True(has)
segments, err := s.Cluster.metaWatcher.ShowSegments()
segments, err := s.Cluster.MetaWatcher.ShowSegments()
s.Require().NoError(err)
s.Require().NotEmpty(segments)
waitingForFlush(ctx, s.Cluster, ids)
s.WaitForFlush(ctx, ids)
// create index
_, err = s.Cluster.proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
_, err = s.Cluster.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
CollectionName: collection,
FieldName: vecFieldName,
IndexName: "_default",
ExtraParams: constructIndexParam(dim, s.indexType, s.metricType),
ExtraParams: integration.ConstructIndexParam(dim, s.indexType, s.metricType),
})
s.Require().NoError(err)
s.Require().Equal(createCollectionStatus.GetErrorCode(), commonpb.ErrorCode_Success)
waitingForIndexBuilt(ctx, s.Cluster, s.T(), collection, vecFieldName)
s.WaitForIndexBuilt(ctx, collection, vecFieldName)
// load
_, err = s.Cluster.proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
_, err = s.Cluster.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
CollectionName: collection,
})
s.Require().NoError(err)
s.Require().Equal(createCollectionStatus.GetErrorCode(), commonpb.ErrorCode_Success)
waitingForLoad(ctx, s.Cluster, collection)
s.WaitForLoad(ctx, collection)
// search
nq := s.nq
topk := s.topK
outputFields := []string{vecFieldName}
params := getSearchParams(s.indexType, s.metricType)
searchReq := constructSearchRequest("", collection, "",
params := integration.GetSearchParams(s.indexType, s.metricType)
searchReq := integration.ConstructSearchRequest("", collection, "",
vecFieldName, s.vecType, outputFields, s.metricType, params, nq, dim, topk, -1)
searchResp, err := s.Cluster.proxy.Search(ctx, searchReq)
searchResp, err := s.Cluster.Proxy.Search(ctx, searchReq)
s.Require().NoError(err)
s.Require().Equal(searchResp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success)
@ -238,7 +239,7 @@ func (s *TestGetVectorSuite) run() {
}
}
status, err := s.Cluster.proxy.DropCollection(ctx, &milvuspb.DropCollectionRequest{
status, err := s.Cluster.Proxy.DropCollection(ctx, &milvuspb.DropCollectionRequest{
CollectionName: collection,
})
s.Require().NoError(err)
@ -248,7 +249,7 @@ func (s *TestGetVectorSuite) run() {
func (s *TestGetVectorSuite) TestGetVector_FLAT() {
s.nq = 10
s.topK = 10
s.indexType = IndexFaissIDMap
s.indexType = integration.IndexFaissIDMap
s.metricType = distance.L2
s.pkType = schemapb.DataType_Int64
s.vecType = schemapb.DataType_FloatVector
@ -258,7 +259,7 @@ func (s *TestGetVectorSuite) TestGetVector_FLAT() {
func (s *TestGetVectorSuite) TestGetVector_IVF_FLAT() {
s.nq = 10
s.topK = 10
s.indexType = IndexFaissIvfFlat
s.indexType = integration.IndexFaissIvfFlat
s.metricType = distance.L2
s.pkType = schemapb.DataType_Int64
s.vecType = schemapb.DataType_FloatVector
@ -268,7 +269,7 @@ func (s *TestGetVectorSuite) TestGetVector_IVF_FLAT() {
func (s *TestGetVectorSuite) TestGetVector_IVF_PQ() {
s.nq = 10
s.topK = 10
s.indexType = IndexFaissIvfPQ
s.indexType = integration.IndexFaissIvfPQ
s.metricType = distance.L2
s.pkType = schemapb.DataType_Int64
s.vecType = schemapb.DataType_FloatVector
@ -278,7 +279,7 @@ func (s *TestGetVectorSuite) TestGetVector_IVF_PQ() {
func (s *TestGetVectorSuite) TestGetVector_IVF_SQ8() {
s.nq = 10
s.topK = 10
s.indexType = IndexFaissIvfSQ8
s.indexType = integration.IndexFaissIvfSQ8
s.metricType = distance.L2
s.pkType = schemapb.DataType_Int64
s.vecType = schemapb.DataType_FloatVector
@ -288,7 +289,7 @@ func (s *TestGetVectorSuite) TestGetVector_IVF_SQ8() {
func (s *TestGetVectorSuite) TestGetVector_HNSW() {
s.nq = 10
s.topK = 10
s.indexType = IndexHNSW
s.indexType = integration.IndexHNSW
s.metricType = distance.L2
s.pkType = schemapb.DataType_Int64
s.vecType = schemapb.DataType_FloatVector
@ -298,7 +299,7 @@ func (s *TestGetVectorSuite) TestGetVector_HNSW() {
func (s *TestGetVectorSuite) TestGetVector_IP() {
s.nq = 10
s.topK = 10
s.indexType = IndexHNSW
s.indexType = integration.IndexHNSW
s.metricType = distance.IP
s.pkType = schemapb.DataType_Int64
s.vecType = schemapb.DataType_FloatVector
@ -308,7 +309,7 @@ func (s *TestGetVectorSuite) TestGetVector_IP() {
func (s *TestGetVectorSuite) TestGetVector_StringPK() {
s.nq = 10
s.topK = 10
s.indexType = IndexHNSW
s.indexType = integration.IndexHNSW
s.metricType = distance.L2
s.pkType = schemapb.DataType_VarChar
s.vecType = schemapb.DataType_FloatVector
@ -318,7 +319,7 @@ func (s *TestGetVectorSuite) TestGetVector_StringPK() {
func (s *TestGetVectorSuite) TestGetVector_BinaryVector() {
s.nq = 10
s.topK = 10
s.indexType = IndexFaissBinIvfFlat
s.indexType = integration.IndexFaissBinIvfFlat
s.metricType = distance.JACCARD
s.pkType = schemapb.DataType_Int64
s.vecType = schemapb.DataType_BinaryVector
@ -329,7 +330,7 @@ func (s *TestGetVectorSuite) TestGetVector_Big_NQ_TOPK() {
s.T().Skip("skip big NQ Top due to timeout")
s.nq = 10000
s.topK = 200
s.indexType = IndexHNSW
s.indexType = integration.IndexHNSW
s.metricType = distance.L2
s.pkType = schemapb.DataType_Int64
s.vecType = schemapb.DataType_FloatVector
@ -339,7 +340,7 @@ func (s *TestGetVectorSuite) TestGetVector_Big_NQ_TOPK() {
//func (s *TestGetVectorSuite) TestGetVector_DISKANN() {
// s.nq = 10
// s.topK = 10
// s.indexType = IndexDISKANN
// s.indexType = integration.IndexDISKANN
// s.metricType = distance.L2
// s.pkType = schemapb.DataType_Int64
// s.vecType = schemapb.DataType_FloatVector

View File

@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package integration
package hellomilvus
import (
"context"
@ -32,14 +32,15 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/distance"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/tests/integration"
)
type HelloMilvusSuite struct {
MiniClusterSuite
integration.MiniClusterSuite
}
func (s *HelloMilvusSuite) TestHelloMilvus() {
ctx, cancel := context.WithCancel(s.Cluster.ctx)
ctx, cancel := context.WithCancel(s.Cluster.GetContext())
defer cancel()
c := s.Cluster
@ -51,11 +52,11 @@ func (s *HelloMilvusSuite) TestHelloMilvus() {
collectionName := "TestHelloMilvus" + funcutil.GenRandomStr()
schema := constructSchema(collectionName, dim, true)
schema := integration.ConstructSchema(collectionName, dim, true)
marshaledSchema, err := proto.Marshal(schema)
s.NoError(err)
createCollectionStatus, err := c.proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
DbName: dbName,
CollectionName: collectionName,
Schema: marshaledSchema,
@ -68,14 +69,14 @@ func (s *HelloMilvusSuite) TestHelloMilvus() {
s.Equal(createCollectionStatus.GetErrorCode(), commonpb.ErrorCode_Success)
log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus))
showCollectionsResp, err := c.proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{})
showCollectionsResp, err := c.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{})
s.NoError(err)
s.Equal(showCollectionsResp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success)
log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp))
fVecColumn := newFloatVectorFieldData(floatVecField, rowNum, dim)
hashKeys := generateHashKeys(rowNum)
insertResult, err := c.proxy.Insert(ctx, &milvuspb.InsertRequest{
fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, rowNum, dim)
hashKeys := integration.GenerateHashKeys(rowNum)
insertResult, err := c.Proxy.Insert(ctx, &milvuspb.InsertRequest{
DbName: dbName,
CollectionName: collectionName,
FieldsData: []*schemapb.FieldData{fVecColumn},
@ -86,7 +87,7 @@ func (s *HelloMilvusSuite) TestHelloMilvus() {
s.Equal(insertResult.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success)
// flush
flushResp, err := c.proxy.Flush(ctx, &milvuspb.FlushRequest{
flushResp, err := c.Proxy.Flush(ctx, &milvuspb.FlushRequest{
DbName: dbName,
CollectionNames: []string{collectionName},
})
@ -96,20 +97,20 @@ func (s *HelloMilvusSuite) TestHelloMilvus() {
s.NotEmpty(segmentIDs)
s.True(has)
segments, err := c.metaWatcher.ShowSegments()
segments, err := c.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
for _, segment := range segments {
log.Info("ShowSegments result", zap.String("segment", segment.String()))
}
waitingForFlush(ctx, c, ids)
s.WaitForFlush(ctx, ids)
// create index
createIndexStatus, err := c.proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
CollectionName: collectionName,
FieldName: floatVecField,
FieldName: integration.FloatVecField,
IndexName: "_default",
ExtraParams: constructIndexParam(dim, IndexFaissIvfFlat, distance.L2),
ExtraParams: integration.ConstructIndexParam(dim, integration.IndexFaissIvfFlat, distance.L2),
})
if createIndexStatus.GetErrorCode() != commonpb.ErrorCode_Success {
log.Warn("createIndexStatus fail reason", zap.String("reason", createIndexStatus.GetReason()))
@ -117,10 +118,10 @@ func (s *HelloMilvusSuite) TestHelloMilvus() {
s.NoError(err)
s.Equal(commonpb.ErrorCode_Success, createIndexStatus.GetErrorCode())
waitingForIndexBuilt(ctx, c, s.T(), collectionName, floatVecField)
s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField)
// load
loadStatus, err := c.proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
DbName: dbName,
CollectionName: collectionName,
})
@ -129,19 +130,19 @@ func (s *HelloMilvusSuite) TestHelloMilvus() {
log.Warn("loadStatus fail reason", zap.String("reason", loadStatus.GetReason()))
}
s.Equal(commonpb.ErrorCode_Success, loadStatus.GetErrorCode())
waitingForLoad(ctx, c, collectionName)
s.WaitForLoad(ctx, collectionName)
// search
expr := fmt.Sprintf("%s > 0", int64Field)
expr := fmt.Sprintf("%s > 0", integration.Int64Field)
nq := 10
topk := 10
roundDecimal := -1
params := getSearchParams(IndexFaissIvfFlat, distance.L2)
searchReq := constructSearchRequest("", collectionName, expr,
floatVecField, schemapb.DataType_FloatVector, nil, distance.L2, params, nq, dim, topk, roundDecimal)
params := integration.GetSearchParams(integration.IndexFaissIvfFlat, distance.L2)
searchReq := integration.ConstructSearchRequest("", collectionName, expr,
integration.FloatVecField, schemapb.DataType_FloatVector, nil, distance.L2, params, nq, dim, topk, roundDecimal)
searchResult, err := c.proxy.Search(ctx, searchReq)
searchResult, err := c.Proxy.Search(ctx, searchReq)
if searchResult.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
log.Warn("searchResult fail reason", zap.String("reason", searchResult.GetStatus().GetReason()))

View File

@ -1,9 +1,11 @@
package integration
package indexstat
import (
"context"
"testing"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/suite"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
@ -12,10 +14,11 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/distance"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/tests/integration"
)
type GetIndexStatisticsSuite struct {
MiniClusterSuite
integration.MiniClusterSuite
}
func (s *GetIndexStatisticsSuite) TestGetIndexStatistics() {
@ -29,11 +32,11 @@ func (s *GetIndexStatisticsSuite) TestGetIndexStatistics() {
dim := 128
rowNum := 3000
schema := constructSchema(collectionName, dim, true)
schema := integration.ConstructSchema(collectionName, dim, true)
marshaledSchema, err := proto.Marshal(schema)
s.NoError(err)
createCollectionStatus, err := c.proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
DbName: dbName,
CollectionName: collectionName,
Schema: marshaledSchema,
@ -45,9 +48,9 @@ func (s *GetIndexStatisticsSuite) TestGetIndexStatistics() {
}
s.Equal(createCollectionStatus.GetErrorCode(), commonpb.ErrorCode_Success)
fVecColumn := newFloatVectorFieldData(floatVecField, rowNum, dim)
hashKeys := generateHashKeys(rowNum)
insertResult, err := c.proxy.Insert(ctx, &milvuspb.InsertRequest{
fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, rowNum, dim)
hashKeys := integration.GenerateHashKeys(rowNum)
insertResult, err := c.Proxy.Insert(ctx, &milvuspb.InsertRequest{
DbName: dbName,
CollectionName: collectionName,
FieldsData: []*schemapb.FieldData{fVecColumn},
@ -58,7 +61,7 @@ func (s *GetIndexStatisticsSuite) TestGetIndexStatistics() {
s.Equal(insertResult.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success)
// flush
flushResp, err := c.proxy.Flush(ctx, &milvuspb.FlushRequest{
flushResp, err := c.Proxy.Flush(ctx, &milvuspb.FlushRequest{
DbName: dbName,
CollectionNames: []string{collectionName},
})
@ -67,15 +70,15 @@ func (s *GetIndexStatisticsSuite) TestGetIndexStatistics() {
ids := segmentIDs.GetData()
s.NotEmpty(segmentIDs)
s.Equal(true, has)
waitingForFlush(ctx, c, ids)
s.WaitForFlush(ctx, ids)
// create index
indexName := "_default"
createIndexStatus, err := c.proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
CollectionName: collectionName,
FieldName: floatVecField,
FieldName: integration.FloatVecField,
IndexName: "_default",
ExtraParams: constructIndexParam(dim, IndexFaissIvfFlat, distance.L2),
ExtraParams: integration.ConstructIndexParam(dim, integration.IndexFaissIvfFlat, distance.L2),
})
if createIndexStatus.GetErrorCode() != commonpb.ErrorCode_Success {
log.Warn("createIndexStatus fail reason", zap.String("reason", createIndexStatus.GetReason()))
@ -83,9 +86,9 @@ func (s *GetIndexStatisticsSuite) TestGetIndexStatistics() {
s.NoError(err)
s.Equal(commonpb.ErrorCode_Success, createIndexStatus.GetErrorCode())
waitingForIndexBuilt(ctx, c, s.T(), collectionName, floatVecField)
s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField)
getIndexStatisticsResponse, err := c.proxy.GetIndexStatistics(ctx, &milvuspb.GetIndexStatisticsRequest{
getIndexStatisticsResponse, err := c.Proxy.GetIndexStatistics(ctx, &milvuspb.GetIndexStatisticsRequest{
CollectionName: collectionName,
IndexName: indexName,
})
@ -132,7 +135,7 @@ func (s *GetIndexStatisticsSuite) TestGetIndexStatistics() {
s.NoError(err)
waitingForIndexBuilt(ctx, c, t, collectionName, floatVecField)
waitingForIndexBuilt(ctx, collectionName, integration.FloatVecField)
getIndexStatisticsResponse2, err := c.proxy.GetIndexStatistics(ctx, &milvuspb.GetIndexStatisticsRequest{
CollectionName: collectionName,
@ -147,3 +150,7 @@ func (s *GetIndexStatisticsSuite) TestGetIndexStatistics() {
log.Info("TestGetIndexStatistics succeed")
}
func TestGetIndexStat(t *testing.T) {
suite.Run(t, new(GetIndexStatisticsSuite))
}

View File

@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package integration
package jsonexpr
import (
"context"
@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/distance"
"github.com/milvus-io/milvus/tests/integration"
"github.com/stretchr/testify/suite"
"github.com/golang/protobuf/proto"
@ -39,7 +40,7 @@ import (
)
type JSONExprSuite struct {
MiniClusterSuite
integration.MiniClusterSuite
}
func (s *JSONExprSuite) TestJsonEnableDynamicSchema() {
@ -55,7 +56,7 @@ func (s *JSONExprSuite) TestJsonEnableDynamicSchema() {
constructCollectionSchema := func() *schemapb.CollectionSchema {
pk := &schemapb.FieldSchema{
FieldID: 0,
Name: int64Field,
Name: integration.Int64Field,
IsPrimaryKey: true,
Description: "",
DataType: schemapb.DataType_Int64,
@ -65,7 +66,7 @@ func (s *JSONExprSuite) TestJsonEnableDynamicSchema() {
}
fVec := &schemapb.FieldSchema{
FieldID: 0,
Name: floatVecField,
Name: integration.FloatVecField,
IsPrimaryKey: false,
Description: "",
DataType: schemapb.DataType_FloatVector,
@ -93,7 +94,7 @@ func (s *JSONExprSuite) TestJsonEnableDynamicSchema() {
marshaledSchema, err := proto.Marshal(schema)
s.NoError(err)
createCollectionStatus, err := c.proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
DbName: dbName,
CollectionName: collectionName,
Schema: marshaledSchema,
@ -106,22 +107,22 @@ func (s *JSONExprSuite) TestJsonEnableDynamicSchema() {
s.Equal(createCollectionStatus.GetErrorCode(), commonpb.ErrorCode_Success)
log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus))
showCollectionsResp, err := c.proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{})
showCollectionsResp, err := c.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{})
s.NoError(err)
s.Equal(showCollectionsResp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success)
log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp))
describeCollectionResp, err := c.proxy.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{CollectionName: collectionName})
describeCollectionResp, err := c.Proxy.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{CollectionName: collectionName})
s.NoError(err)
s.True(describeCollectionResp.Schema.EnableDynamicField)
s.Equal(2, len(describeCollectionResp.GetSchema().GetFields()))
fVecColumn := newFloatVectorFieldData(floatVecField, rowNum, dim)
fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, rowNum, dim)
jsonData := newJSONData(common.MetaFieldName, rowNum)
jsonData.IsDynamic = true
s.insertFlushIndexLoad(ctx, c, dbName, collectionName, rowNum, dim, []*schemapb.FieldData{fVecColumn, jsonData})
s.insertFlushIndexLoad(ctx, dbName, collectionName, rowNum, dim, []*schemapb.FieldData{fVecColumn, jsonData})
s.checkSearch(c, collectionName, common.MetaFieldName, dim)
s.checkSearch(collectionName, common.MetaFieldName, dim)
}
func (s *JSONExprSuite) TestJSON_InsertWithoutDynamicData() {
@ -138,7 +139,7 @@ func (s *JSONExprSuite) TestJSON_InsertWithoutDynamicData() {
constructCollectionSchema := func() *schemapb.CollectionSchema {
pk := &schemapb.FieldSchema{
FieldID: 0,
Name: int64Field,
Name: integration.Int64Field,
IsPrimaryKey: true,
Description: "",
DataType: schemapb.DataType_Int64,
@ -148,7 +149,7 @@ func (s *JSONExprSuite) TestJSON_InsertWithoutDynamicData() {
}
fVec := &schemapb.FieldSchema{
FieldID: 0,
Name: floatVecField,
Name: integration.FloatVecField,
IsPrimaryKey: false,
Description: "",
DataType: schemapb.DataType_FloatVector,
@ -176,7 +177,7 @@ func (s *JSONExprSuite) TestJSON_InsertWithoutDynamicData() {
marshaledSchema, err := proto.Marshal(schema)
s.NoError(err)
createCollectionStatus, err := c.proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
DbName: dbName,
CollectionName: collectionName,
Schema: marshaledSchema,
@ -189,18 +190,18 @@ func (s *JSONExprSuite) TestJSON_InsertWithoutDynamicData() {
s.Equal(createCollectionStatus.GetErrorCode(), commonpb.ErrorCode_Success)
log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus))
showCollectionsResp, err := c.proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{})
showCollectionsResp, err := c.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{})
s.NoError(err)
s.Equal(showCollectionsResp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success)
log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp))
describeCollectionResp, err := c.proxy.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{CollectionName: collectionName})
describeCollectionResp, err := c.Proxy.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{CollectionName: collectionName})
s.NoError(err)
s.True(describeCollectionResp.Schema.EnableDynamicField)
s.Equal(2, len(describeCollectionResp.GetSchema().GetFields()))
fVecColumn := newFloatVectorFieldData(floatVecField, rowNum, dim)
s.insertFlushIndexLoad(ctx, c, dbName, collectionName, rowNum, dim, []*schemapb.FieldData{fVecColumn})
fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, rowNum, dim)
s.insertFlushIndexLoad(ctx, dbName, collectionName, rowNum, dim, []*schemapb.FieldData{fVecColumn})
expr := ""
// search
@ -208,7 +209,7 @@ func (s *JSONExprSuite) TestJSON_InsertWithoutDynamicData() {
checkFunc := func(result *milvuspb.SearchResults) {
s.Equal(0, len(result.Results.FieldsData))
}
s.doSearch(c, collectionName, []string{common.MetaFieldName}, expr, dim, checkFunc)
s.doSearch(collectionName, []string{common.MetaFieldName}, expr, dim, checkFunc)
log.Info("GT expression run successfully")
}
@ -226,7 +227,7 @@ func (s *JSONExprSuite) TestJSON_DynamicSchemaWithJSON() {
constructCollectionSchema := func() *schemapb.CollectionSchema {
pk := &schemapb.FieldSchema{
FieldID: 0,
Name: int64Field,
Name: integration.Int64Field,
IsPrimaryKey: true,
Description: "",
DataType: schemapb.DataType_Int64,
@ -236,7 +237,7 @@ func (s *JSONExprSuite) TestJSON_DynamicSchemaWithJSON() {
}
fVec := &schemapb.FieldSchema{
FieldID: 0,
Name: floatVecField,
Name: integration.FloatVecField,
IsPrimaryKey: false,
Description: "",
DataType: schemapb.DataType_FloatVector,
@ -250,7 +251,7 @@ func (s *JSONExprSuite) TestJSON_DynamicSchemaWithJSON() {
AutoID: false,
}
j := &schemapb.FieldSchema{
Name: jsonField,
Name: integration.JSONField,
Description: "json field",
DataType: schemapb.DataType_JSON,
}
@ -270,7 +271,7 @@ func (s *JSONExprSuite) TestJSON_DynamicSchemaWithJSON() {
marshaledSchema, err := proto.Marshal(schema)
s.NoError(err)
createCollectionStatus, err := c.proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
DbName: dbName,
CollectionName: collectionName,
Schema: marshaledSchema,
@ -283,91 +284,91 @@ func (s *JSONExprSuite) TestJSON_DynamicSchemaWithJSON() {
s.Equal(createCollectionStatus.GetErrorCode(), commonpb.ErrorCode_Success)
log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus))
showCollectionsResp, err := c.proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{})
showCollectionsResp, err := c.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{})
s.NoError(err)
s.Equal(showCollectionsResp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success)
log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp))
describeCollectionResp, err := c.proxy.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{CollectionName: collectionName})
describeCollectionResp, err := c.Proxy.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{CollectionName: collectionName})
s.NoError(err)
s.True(describeCollectionResp.Schema.EnableDynamicField)
s.Equal(3, len(describeCollectionResp.GetSchema().GetFields()))
fVecColumn := newFloatVectorFieldData(floatVecField, rowNum, dim)
jsonData := newJSONData(jsonField, rowNum)
fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, rowNum, dim)
jsonData := newJSONData(integration.JSONField, rowNum)
dynamicData := newJSONData(common.MetaFieldName, rowNum)
dynamicData.IsDynamic = true
s.insertFlushIndexLoad(ctx, c, dbName, collectionName, rowNum, dim, []*schemapb.FieldData{fVecColumn, jsonData, dynamicData})
s.insertFlushIndexLoad(ctx, dbName, collectionName, rowNum, dim, []*schemapb.FieldData{fVecColumn, jsonData, dynamicData})
s.checkSearch(c, collectionName, common.MetaFieldName, dim)
s.checkSearch(collectionName, common.MetaFieldName, dim)
expr := ""
// search
expr = `jsonField["A"] < 10`
checkFunc := func(result *milvuspb.SearchResults) {
s.Equal(1, len(result.Results.FieldsData))
s.Equal(jsonField, result.Results.FieldsData[0].GetFieldName())
s.Equal(integration.JSONField, result.Results.FieldsData[0].GetFieldName())
s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType())
s.Equal(5, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData()))
}
s.doSearch(c, collectionName, []string{jsonField}, expr, dim, checkFunc)
s.doSearch(collectionName, []string{integration.JSONField}, expr, dim, checkFunc)
log.Info("LT expression run successfully")
expr = `jsonField["A"] <= 5`
checkFunc = func(result *milvuspb.SearchResults) {
s.Equal(1, len(result.Results.FieldsData))
s.Equal(jsonField, result.Results.FieldsData[0].GetFieldName())
s.Equal(integration.JSONField, result.Results.FieldsData[0].GetFieldName())
s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType())
s.Equal(3, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData()))
}
s.doSearch(c, collectionName, []string{jsonField}, expr, dim, checkFunc)
s.doSearch(collectionName, []string{integration.JSONField}, expr, dim, checkFunc)
log.Info("LE expression run successfully")
expr = `jsonField["A"] == 5`
checkFunc = func(result *milvuspb.SearchResults) {
s.Equal(1, len(result.Results.FieldsData))
s.Equal(jsonField, result.Results.FieldsData[0].GetFieldName())
s.Equal(integration.JSONField, result.Results.FieldsData[0].GetFieldName())
s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType())
s.Equal(1, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData()))
}
s.doSearch(c, collectionName, []string{jsonField}, expr, dim, checkFunc)
s.doSearch(collectionName, []string{integration.JSONField}, expr, dim, checkFunc)
log.Info("EQ expression run successfully")
expr = `jsonField["C"][0] in [90, 91, 95, 97]`
checkFunc = func(result *milvuspb.SearchResults) {
s.Equal(1, len(result.Results.FieldsData))
s.Equal(jsonField, result.Results.FieldsData[0].GetFieldName())
s.Equal(integration.JSONField, result.Results.FieldsData[0].GetFieldName())
s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType())
s.Equal(4, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData()))
}
s.doSearch(c, collectionName, []string{jsonField}, expr, dim, checkFunc)
s.doSearch(collectionName, []string{integration.JSONField}, expr, dim, checkFunc)
log.Info("IN expression run successfully")
expr = `jsonField["C"][0] not in [90, 91, 95, 97]`
checkFunc = func(result *milvuspb.SearchResults) {
s.Equal(1, len(result.Results.FieldsData))
s.Equal(jsonField, result.Results.FieldsData[0].GetFieldName())
s.Equal(integration.JSONField, result.Results.FieldsData[0].GetFieldName())
s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType())
s.Equal(10, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData()))
}
s.doSearch(c, collectionName, []string{jsonField}, expr, dim, checkFunc)
s.doSearch(collectionName, []string{integration.JSONField}, expr, dim, checkFunc)
log.Info("NIN expression run successfully")
expr = `jsonField["E"]["G"] > 100`
checkFunc = func(result *milvuspb.SearchResults) {
s.Equal(1, len(result.Results.FieldsData))
s.Equal(jsonField, result.Results.FieldsData[0].GetFieldName())
s.Equal(integration.JSONField, result.Results.FieldsData[0].GetFieldName())
s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType())
s.Equal(9, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData()))
}
s.doSearch(c, collectionName, []string{jsonField}, expr, dim, checkFunc)
s.doSearch(collectionName, []string{integration.JSONField}, expr, dim, checkFunc)
log.Info("nested path expression run successfully")
expr = `jsonField == ""`
s.doSearchWithInvalidExpr(c, collectionName, []string{jsonField}, expr, dim)
s.doSearchWithInvalidExpr(collectionName, []string{integration.JSONField}, expr, dim)
}
func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName string, dim int) {
func (s *JSONExprSuite) checkSearch(collectionName, fieldName string, dim int) {
expr := ""
// search
expr = `$meta["A"] > 90`
@ -377,7 +378,7 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st
s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType())
s.Equal(5, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData()))
}
s.doSearch(c, collectionName, []string{"A"}, expr, dim, checkFunc)
s.doSearch(collectionName, []string{"A"}, expr, dim, checkFunc)
log.Info("GT expression run successfully")
expr = `$meta["A"] < 10`
@ -387,7 +388,7 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st
s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType())
s.Equal(5, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData()))
}
s.doSearch(c, collectionName, []string{"B"}, expr, dim, checkFunc)
s.doSearch(collectionName, []string{"B"}, expr, dim, checkFunc)
log.Info("LT expression run successfully")
expr = `$meta["A"] <= 5`
@ -397,7 +398,7 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st
s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType())
s.Equal(3, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData()))
}
s.doSearch(c, collectionName, []string{"C"}, expr, dim, checkFunc)
s.doSearch(collectionName, []string{"C"}, expr, dim, checkFunc)
log.Info("LE expression run successfully")
expr = `A >= 95`
@ -407,7 +408,7 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st
s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType())
s.Equal(3, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData()))
}
s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc)
s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc)
log.Info("GE expression run successfully")
expr = `$meta["A"] == 5`
@ -417,7 +418,7 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st
s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType())
s.Equal(1, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData()))
}
s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc)
s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc)
log.Info("EQ expression run successfully")
expr = `A != 95`
@ -427,7 +428,7 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st
s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType())
s.Equal(10, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData()))
}
s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc)
s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc)
log.Info("NE expression run successfully")
expr = `not (A != 95)`
@ -437,7 +438,7 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st
s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType())
s.Equal(1, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData()))
}
s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc)
s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc)
log.Info("NOT NE expression run successfully")
expr = `A > 90 && B < 5`
@ -447,7 +448,7 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st
s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType())
s.Equal(2, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData()))
}
s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc)
s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc)
log.Info("NE expression run successfully")
expr = `A > 95 || 5 > B`
@ -457,7 +458,7 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st
s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType())
s.Equal(4, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData()))
}
s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc)
s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc)
log.Info("NE expression run successfully")
expr = `not (A == 95)`
@ -467,7 +468,7 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st
s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType())
s.Equal(10, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData()))
}
s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc)
s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc)
log.Info("NOT expression run successfully")
expr = `A in [90, 91, 95, 97]`
@ -477,7 +478,7 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st
s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType())
s.Equal(3, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData()))
}
s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc)
s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc)
log.Info("IN expression run successfully")
expr = `A not in [90, 91, 95, 97]`
@ -487,7 +488,7 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st
s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType())
s.Equal(10, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData()))
}
s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc)
s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc)
log.Info("NIN expression run successfully")
expr = `C[0] in [90, 91, 95, 97]`
@ -497,7 +498,7 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st
s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType())
s.Equal(4, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData()))
}
s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc)
s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc)
log.Info("IN expression run successfully")
expr = `C[0] not in [90, 91, 95, 97]`
@ -507,7 +508,7 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st
s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType())
s.Equal(10, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData()))
}
s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc)
s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc)
log.Info("NIN expression run successfully")
expr = `0 <= A < 5`
@ -517,7 +518,7 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st
s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType())
s.Equal(2, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData()))
}
s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc)
s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc)
log.Info("BinaryRange expression run successfully")
expr = `100 > A >= 90`
@ -527,7 +528,7 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st
s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType())
s.Equal(5, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData()))
}
s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc)
s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc)
log.Info("BinaryRange expression run successfully")
expr = `1+5 <= A < 5+10`
@ -537,7 +538,7 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st
s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType())
s.Equal(4, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData()))
}
s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc)
s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc)
log.Info("BinaryRange expression run successfully")
expr = `A + 5 == 10`
@ -547,7 +548,7 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st
s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType())
s.Equal(1, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData()))
}
s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc)
s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc)
log.Info("Arithmetic expression run successfully")
expr = `exists A`
@ -557,14 +558,14 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st
s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType())
s.Equal(10, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData()))
}
s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc)
s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc)
log.Info("EXISTS expression run successfully")
expr = `exists AAA`
checkFunc = func(result *milvuspb.SearchResults) {
s.Equal(0, len(result.Results.FieldsData))
}
s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc)
s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc)
log.Info("EXISTS expression run successfully")
expr = `not exists A`
@ -574,7 +575,7 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st
s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType())
s.Equal(10, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData()))
}
s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc)
s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc)
log.Info("NOT EXISTS expression run successfully")
expr = `E["G"] > 100`
@ -584,7 +585,7 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st
s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType())
s.Equal(9, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData()))
}
s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc)
s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc)
log.Info("nested path expression run successfully")
expr = `D like "name-%"`
@ -594,7 +595,7 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st
s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType())
s.Equal(10, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData()))
}
s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc)
s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc)
log.Info("like expression run successfully")
expr = `D like "name-11"`
@ -604,21 +605,21 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st
s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType())
s.Equal(1, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData()))
}
s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc)
s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc)
log.Info("like expression run successfully")
expr = `A like "10"`
checkFunc = func(result *milvuspb.SearchResults) {
s.Equal(0, len(result.Results.FieldsData))
}
s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc)
s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc)
log.Info("like expression run successfully")
expr = `A in []`
checkFunc = func(result *milvuspb.SearchResults) {
s.Equal(0, len(result.Results.FieldsData))
}
s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc)
s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc)
log.Info("term empty expression run successfully")
expr = `A not in []`
@ -628,47 +629,47 @@ func (s *JSONExprSuite) checkSearch(c *MiniCluster, collectionName, fieldName st
s.Equal(schemapb.DataType_JSON, result.Results.FieldsData[0].GetType())
s.Equal(10, len(result.Results.FieldsData[0].GetScalars().GetJsonData().GetData()))
}
s.doSearch(c, collectionName, []string{fieldName}, expr, dim, checkFunc)
s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc)
log.Info("term empty expression run successfully")
// invalid expr
expr = `E[F] > 100`
s.doSearchWithInvalidExpr(c, collectionName, []string{fieldName}, expr, dim)
s.doSearchWithInvalidExpr(collectionName, []string{fieldName}, expr, dim)
expr = `A >> 10`
s.doSearchWithInvalidExpr(c, collectionName, []string{fieldName}, expr, dim)
s.doSearchWithInvalidExpr(collectionName, []string{fieldName}, expr, dim)
expr = `not A > 5`
s.doSearchWithInvalidExpr(c, collectionName, []string{fieldName}, expr, dim)
s.doSearchWithInvalidExpr(collectionName, []string{fieldName}, expr, dim)
expr = `not A == 5`
s.doSearchWithInvalidExpr(c, collectionName, []string{fieldName}, expr, dim)
s.doSearchWithInvalidExpr(collectionName, []string{fieldName}, expr, dim)
expr = `A > B`
s.doSearchWithInvalidExpr(c, collectionName, []string{fieldName}, expr, dim)
s.doSearchWithInvalidExpr(collectionName, []string{fieldName}, expr, dim)
expr = `A > Int64Field`
s.doSearchWithInvalidExpr(c, collectionName, []string{fieldName}, expr, dim)
s.doSearchWithInvalidExpr(collectionName, []string{fieldName}, expr, dim)
expr = `A like abc`
s.doSearchWithInvalidExpr(c, collectionName, []string{fieldName}, expr, dim)
s.doSearchWithInvalidExpr(collectionName, []string{fieldName}, expr, dim)
expr = `D like "%name-%"`
s.doSearchWithInvalidExpr(c, collectionName, []string{fieldName}, expr, dim)
s.doSearchWithInvalidExpr(collectionName, []string{fieldName}, expr, dim)
expr = `D like "na%me"`
s.doSearchWithInvalidExpr(c, collectionName, []string{fieldName}, expr, dim)
s.doSearchWithInvalidExpr(collectionName, []string{fieldName}, expr, dim)
expr = `1+5 <= A+1 < 5+10`
s.doSearchWithInvalidExpr(c, collectionName, []string{fieldName}, expr, dim)
s.doSearchWithInvalidExpr(collectionName, []string{fieldName}, expr, dim)
expr = `$meta == ""`
s.doSearchWithInvalidExpr(c, collectionName, []string{fieldName}, expr, dim)
s.doSearchWithInvalidExpr(collectionName, []string{fieldName}, expr, dim)
}
func (s *JSONExprSuite) insertFlushIndexLoad(ctx context.Context, c *MiniCluster, dbName, collectionName string, rowNum int, dim int, fieldData []*schemapb.FieldData) {
hashKeys := generateHashKeys(rowNum)
insertResult, err := c.proxy.Insert(ctx, &milvuspb.InsertRequest{
func (s *JSONExprSuite) insertFlushIndexLoad(ctx context.Context, dbName, collectionName string, rowNum int, dim int, fieldData []*schemapb.FieldData) {
hashKeys := integration.GenerateHashKeys(rowNum)
insertResult, err := s.Cluster.Proxy.Insert(ctx, &milvuspb.InsertRequest{
DbName: dbName,
CollectionName: collectionName,
FieldsData: fieldData,
@ -679,7 +680,7 @@ func (s *JSONExprSuite) insertFlushIndexLoad(ctx context.Context, c *MiniCluster
s.Equal(insertResult.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success)
// flush
flushResp, err := c.proxy.Flush(ctx, &milvuspb.FlushRequest{
flushResp, err := s.Cluster.Proxy.Flush(ctx, &milvuspb.FlushRequest{
DbName: dbName,
CollectionNames: []string{collectionName},
})
@ -688,7 +689,7 @@ func (s *JSONExprSuite) insertFlushIndexLoad(ctx context.Context, c *MiniCluster
ids := segmentIDs.GetData()
s.NotEmpty(segmentIDs)
segments, err := c.metaWatcher.ShowSegments()
segments, err := s.Cluster.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
for _, segment := range segments {
@ -697,7 +698,7 @@ func (s *JSONExprSuite) insertFlushIndexLoad(ctx context.Context, c *MiniCluster
if has && len(ids) > 0 {
flushed := func() bool {
resp, err := c.proxy.GetFlushState(ctx, &milvuspb.GetFlushStateRequest{
resp, err := s.Cluster.Proxy.GetFlushState(ctx, &milvuspb.GetFlushStateRequest{
SegmentIDs: ids,
})
if err != nil {
@ -718,9 +719,9 @@ func (s *JSONExprSuite) insertFlushIndexLoad(ctx context.Context, c *MiniCluster
}
// create index
createIndexStatus, err := c.proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
createIndexStatus, err := s.Cluster.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
CollectionName: collectionName,
FieldName: floatVecField,
FieldName: integration.FloatVecField,
IndexName: "_default",
ExtraParams: []*commonpb.KeyValuePair{
{
@ -746,10 +747,10 @@ func (s *JSONExprSuite) insertFlushIndexLoad(ctx context.Context, c *MiniCluster
}
s.NoError(err)
s.Equal(commonpb.ErrorCode_Success, createIndexStatus.GetErrorCode())
waitingForIndexBuilt(ctx, c, s.T(), collectionName, floatVecField)
s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField)
// load
loadStatus, err := c.proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
loadStatus, err := s.Cluster.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
DbName: dbName,
CollectionName: collectionName,
})
@ -759,7 +760,7 @@ func (s *JSONExprSuite) insertFlushIndexLoad(ctx context.Context, c *MiniCluster
}
s.Equal(commonpb.ErrorCode_Success, loadStatus.GetErrorCode())
for {
loadProgress, err := c.proxy.GetLoadingProgress(ctx, &milvuspb.GetLoadingProgressRequest{
loadProgress, err := s.Cluster.Proxy.GetLoadingProgress(ctx, &milvuspb.GetLoadingProgressRequest{
CollectionName: collectionName,
})
if err != nil {
@ -772,16 +773,16 @@ func (s *JSONExprSuite) insertFlushIndexLoad(ctx context.Context, c *MiniCluster
}
}
func (s *JSONExprSuite) doSearch(cluster *MiniCluster, collectionName string, outputField []string, expr string, dim int, checkFunc func(results *milvuspb.SearchResults)) {
func (s *JSONExprSuite) doSearch(collectionName string, outputField []string, expr string, dim int, checkFunc func(results *milvuspb.SearchResults)) {
nq := 1
topk := 10
roundDecimal := -1
params := getSearchParams(IndexFaissIvfFlat, distance.L2)
searchReq := constructSearchRequest("", collectionName, expr,
floatVecField, schemapb.DataType_FloatVector, outputField, distance.L2, params, nq, dim, topk, roundDecimal)
params := integration.GetSearchParams(integration.IndexFaissIvfFlat, distance.L2)
searchReq := integration.ConstructSearchRequest("", collectionName, expr,
integration.FloatVecField, schemapb.DataType_FloatVector, outputField, distance.L2, params, nq, dim, topk, roundDecimal)
searchResult, err := cluster.proxy.Search(context.Background(), searchReq)
searchResult, err := s.Cluster.Proxy.Search(context.Background(), searchReq)
if searchResult.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
log.Warn("searchResult fail reason", zap.String("reason", searchResult.GetStatus().GetReason()))
@ -845,16 +846,16 @@ func newJSONData(fieldName string, rowNum int) *schemapb.FieldData {
}
}
func (s *JSONExprSuite) doSearchWithInvalidExpr(cluster *MiniCluster, collectionName string, outputField []string, expr string, dim int) {
func (s *JSONExprSuite) doSearchWithInvalidExpr(collectionName string, outputField []string, expr string, dim int) {
nq := 1
topk := 10
roundDecimal := -1
params := getSearchParams(IndexFaissIvfFlat, distance.L2)
searchReq := constructSearchRequest("", collectionName, expr,
floatVecField, schemapb.DataType_FloatVector, outputField, distance.L2, params, nq, dim, topk, roundDecimal)
params := integration.GetSearchParams(integration.IndexFaissIvfFlat, distance.L2)
searchReq := integration.ConstructSearchRequest("", collectionName, expr,
integration.FloatVecField, schemapb.DataType_FloatVector, outputField, distance.L2, params, nq, dim, topk, roundDecimal)
searchResult, err := cluster.proxy.Search(context.Background(), searchReq)
searchResult, err := s.Cluster.Proxy.Search(context.Background(), searchReq)
if searchResult.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
log.Warn("searchResult fail reason", zap.String("reason", searchResult.GetStatus().GetReason()))

View File

@ -41,7 +41,7 @@ type MetaWatcherSuite struct {
}
func (s *MetaWatcherSuite) TestShowSessions() {
sessions, err := s.Cluster.metaWatcher.ShowSessions()
sessions, err := s.Cluster.MetaWatcher.ShowSessions()
s.NoError(err)
s.NotEmpty(sessions)
for _, session := range sessions {
@ -102,7 +102,7 @@ func (s *MetaWatcherSuite) TestShowSegments() {
marshaledSchema, err := proto.Marshal(schema)
s.NoError(err)
createCollectionStatus, err := c.proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
DbName: dbName,
CollectionName: collectionName,
Schema: marshaledSchema,
@ -112,14 +112,14 @@ func (s *MetaWatcherSuite) TestShowSegments() {
s.Equal(createCollectionStatus.GetErrorCode(), commonpb.ErrorCode_Success)
log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus))
showCollectionsResp, err := c.proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{})
showCollectionsResp, err := c.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{})
s.NoError(err)
s.Equal(showCollectionsResp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success)
log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp))
fVecColumn := newFloatVectorFieldData(floatVecField, rowNum, dim)
hashKeys := generateHashKeys(rowNum)
insertResult, err := c.proxy.Insert(ctx, &milvuspb.InsertRequest{
fVecColumn := NewFloatVectorFieldData(floatVecField, rowNum, dim)
hashKeys := GenerateHashKeys(rowNum)
insertResult, err := c.Proxy.Insert(ctx, &milvuspb.InsertRequest{
DbName: dbName,
CollectionName: collectionName,
FieldsData: []*schemapb.FieldData{fVecColumn},
@ -129,7 +129,7 @@ func (s *MetaWatcherSuite) TestShowSegments() {
s.NoError(err)
s.Equal(insertResult.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success)
segments, err := c.metaWatcher.ShowSegments()
segments, err := c.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
for _, segment := range segments {
@ -190,7 +190,7 @@ func (s *MetaWatcherSuite) TestShowReplicas() {
marshaledSchema, err := proto.Marshal(schema)
s.NoError(err)
createCollectionStatus, err := c.proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
DbName: dbName,
CollectionName: collectionName,
Schema: marshaledSchema,
@ -203,14 +203,14 @@ func (s *MetaWatcherSuite) TestShowReplicas() {
s.Equal(createCollectionStatus.GetErrorCode(), commonpb.ErrorCode_Success)
log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus))
showCollectionsResp, err := c.proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{})
showCollectionsResp, err := c.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{})
s.NoError(err)
s.Equal(showCollectionsResp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success)
log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp))
fVecColumn := newFloatVectorFieldData(floatVecField, rowNum, dim)
hashKeys := generateHashKeys(rowNum)
insertResult, err := c.proxy.Insert(ctx, &milvuspb.InsertRequest{
fVecColumn := NewFloatVectorFieldData(floatVecField, rowNum, dim)
hashKeys := GenerateHashKeys(rowNum)
insertResult, err := c.Proxy.Insert(ctx, &milvuspb.InsertRequest{
DbName: dbName,
CollectionName: collectionName,
FieldsData: []*schemapb.FieldData{fVecColumn},
@ -221,7 +221,7 @@ func (s *MetaWatcherSuite) TestShowReplicas() {
s.Equal(insertResult.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success)
// flush
flushResp, err := c.proxy.Flush(ctx, &milvuspb.FlushRequest{
flushResp, err := c.Proxy.Flush(ctx, &milvuspb.FlushRequest{
DbName: dbName,
CollectionNames: []string{collectionName},
})
@ -230,7 +230,7 @@ func (s *MetaWatcherSuite) TestShowReplicas() {
ids := segmentIDs.GetData()
s.NotEmpty(segmentIDs)
segments, err := c.metaWatcher.ShowSegments()
segments, err := c.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
for _, segment := range segments {
@ -239,7 +239,7 @@ func (s *MetaWatcherSuite) TestShowReplicas() {
if has && len(ids) > 0 {
flushed := func() bool {
resp, err := c.proxy.GetFlushState(ctx, &milvuspb.GetFlushStateRequest{
resp, err := c.Proxy.GetFlushState(ctx, &milvuspb.GetFlushStateRequest{
SegmentIDs: ids,
})
if err != nil {
@ -260,7 +260,7 @@ func (s *MetaWatcherSuite) TestShowReplicas() {
}
// create index
createIndexStatus, err := c.proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
CollectionName: collectionName,
FieldName: floatVecField,
IndexName: "_default",
@ -292,7 +292,7 @@ func (s *MetaWatcherSuite) TestShowReplicas() {
waitingForIndexBuilt(ctx, c, s.T(), collectionName, floatVecField)
// load
loadStatus, err := c.proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
DbName: dbName,
CollectionName: collectionName,
})
@ -302,7 +302,7 @@ func (s *MetaWatcherSuite) TestShowReplicas() {
}
s.Equal(commonpb.ErrorCode_Success, loadStatus.GetErrorCode())
for {
loadProgress, err := c.proxy.GetLoadingProgress(ctx, &milvuspb.GetLoadingProgressRequest{
loadProgress, err := c.Proxy.GetLoadingProgress(ctx, &milvuspb.GetLoadingProgressRequest{
CollectionName: collectionName,
})
if err != nil {
@ -314,7 +314,7 @@ func (s *MetaWatcherSuite) TestShowReplicas() {
time.Sleep(500 * time.Millisecond)
}
replicas, err := c.metaWatcher.ShowReplicas()
replicas, err := c.MetaWatcher.ShowReplicas()
s.NoError(err)
s.NotEmpty(replicas)
for _, replica := range replicas {

View File

@ -101,20 +101,20 @@ type MiniCluster struct {
clusterConfig ClusterConfig
factory dependency.Factory
chunkManager storage.ChunkManager
ChunkManager storage.ChunkManager
etcdCli *clientv3.Client
EtcdCli *clientv3.Client
proxy types.ProxyComponent
dataCoord types.DataCoordComponent
rootCoord types.RootCoordComponent
Proxy types.ProxyComponent
DataCoord types.DataCoordComponent
RootCoord types.RootCoordComponent
QueryCoord types.QueryCoordComponent
queryCoord types.QueryCoordComponent
queryNodes []types.QueryNodeComponent
dataNodes []types.DataNodeComponent
indexNodes []types.IndexNodeComponent
QueryNodes []types.QueryNodeComponent
DataNodes []types.DataNodeComponent
IndexNodes []types.IndexNodeComponent
metaWatcher MetaWatcher
MetaWatcher MetaWatcher
}
var params *paramtable.ComponentParam = paramtable.Get()
@ -145,10 +145,10 @@ func StartMiniCluster(ctx context.Context, opts ...Option) (cluster *MiniCluster
if err != nil {
return nil, err
}
cluster.chunkManager = chunkManager
cluster.ChunkManager = chunkManager
}
if cluster.etcdCli == nil {
if cluster.EtcdCli == nil {
var etcdCli *clientv3.Client
etcdCli, err = etcd.GetEtcdClient(
params.EtcdCfg.UseEmbedEtcd.GetAsBool(),
@ -161,39 +161,39 @@ func StartMiniCluster(ctx context.Context, opts ...Option) (cluster *MiniCluster
if err != nil {
return nil, err
}
cluster.etcdCli = etcdCli
cluster.EtcdCli = etcdCli
}
cluster.metaWatcher = &EtcdMetaWatcher{
cluster.MetaWatcher = &EtcdMetaWatcher{
rootPath: cluster.params[EtcdRootPath],
etcdCli: cluster.etcdCli,
etcdCli: cluster.EtcdCli,
}
if cluster.rootCoord == nil {
if cluster.RootCoord == nil {
var rootCoord types.RootCoordComponent
rootCoord, err = cluster.CreateDefaultRootCoord()
if err != nil {
return nil, err
}
cluster.rootCoord = rootCoord
cluster.RootCoord = rootCoord
}
if cluster.dataCoord == nil {
if cluster.DataCoord == nil {
var dataCoord types.DataCoordComponent
dataCoord, err = cluster.CreateDefaultDataCoord()
if err != nil {
return nil, err
}
cluster.dataCoord = dataCoord
cluster.DataCoord = dataCoord
}
if cluster.queryCoord == nil {
if cluster.QueryCoord == nil {
var queryCoord types.QueryCoordComponent
queryCoord, err = cluster.CreateDefaultQueryCoord()
if err != nil {
return nil, err
}
cluster.queryCoord = queryCoord
cluster.QueryCoord = queryCoord
}
//if cluster.indexCoord == nil {
@ -205,7 +205,7 @@ func StartMiniCluster(ctx context.Context, opts ...Option) (cluster *MiniCluster
// cluster.indexCoord = indexCoord
//}
if cluster.dataNodes == nil {
if cluster.DataNodes == nil {
dataNodes := make([]types.DataNodeComponent, 0)
for i := 0; i < cluster.clusterConfig.DataNodeNum; i++ {
var dataNode types.DataNodeComponent
@ -215,10 +215,10 @@ func StartMiniCluster(ctx context.Context, opts ...Option) (cluster *MiniCluster
}
dataNodes = append(dataNodes, dataNode)
}
cluster.dataNodes = dataNodes
cluster.DataNodes = dataNodes
}
if cluster.queryNodes == nil {
if cluster.QueryNodes == nil {
queryNodes := make([]types.QueryNodeComponent, 0)
for i := 0; i < cluster.clusterConfig.QueryNodeNum; i++ {
var queryNode types.QueryNodeComponent
@ -228,10 +228,10 @@ func StartMiniCluster(ctx context.Context, opts ...Option) (cluster *MiniCluster
}
queryNodes = append(queryNodes, queryNode)
}
cluster.queryNodes = queryNodes
cluster.QueryNodes = queryNodes
}
if cluster.indexNodes == nil {
if cluster.IndexNodes == nil {
indexNodes := make([]types.IndexNodeComponent, 0)
for i := 0; i < cluster.clusterConfig.IndexNodeNum; i++ {
var indexNode types.IndexNodeComponent
@ -241,22 +241,22 @@ func StartMiniCluster(ctx context.Context, opts ...Option) (cluster *MiniCluster
}
indexNodes = append(indexNodes, indexNode)
}
cluster.indexNodes = indexNodes
cluster.IndexNodes = indexNodes
}
if cluster.proxy == nil {
if cluster.Proxy == nil {
var proxy types.ProxyComponent
proxy, err = cluster.CreateDefaultProxy()
if err != nil {
return
}
cluster.proxy = proxy
cluster.Proxy = proxy
}
//cluster.dataCoord.SetIndexCoord(cluster.indexCoord)
cluster.dataCoord.SetRootCoord(cluster.rootCoord)
cluster.DataCoord.SetRootCoord(cluster.RootCoord)
err = cluster.rootCoord.SetDataCoord(cluster.dataCoord)
err = cluster.RootCoord.SetDataCoord(cluster.DataCoord)
if err != nil {
return
}
@ -264,7 +264,7 @@ func StartMiniCluster(ctx context.Context, opts ...Option) (cluster *MiniCluster
//if err != nil {
// return
//}
err = cluster.rootCoord.SetQueryCoord(cluster.queryCoord)
err = cluster.RootCoord.SetQueryCoord(cluster.QueryCoord)
if err != nil {
return
}
@ -273,11 +273,11 @@ func StartMiniCluster(ctx context.Context, opts ...Option) (cluster *MiniCluster
if err != nil {
return
}
err = cluster.queryCoord.SetDataCoord(cluster.dataCoord)
err = cluster.QueryCoord.SetDataCoord(cluster.DataCoord)
if err != nil {
return
}
err = cluster.queryCoord.SetRootCoord(cluster.rootCoord)
err = cluster.QueryCoord.SetRootCoord(cluster.RootCoord)
if err != nil {
return
}
@ -291,21 +291,21 @@ func StartMiniCluster(ctx context.Context, opts ...Option) (cluster *MiniCluster
// return
//}
for _, dataNode := range cluster.dataNodes {
err = dataNode.SetDataCoord(cluster.dataCoord)
for _, dataNode := range cluster.DataNodes {
err = dataNode.SetDataCoord(cluster.DataCoord)
if err != nil {
return
}
err = dataNode.SetRootCoord(cluster.rootCoord)
err = dataNode.SetRootCoord(cluster.RootCoord)
if err != nil {
return
}
}
cluster.proxy.SetDataCoordClient(cluster.dataCoord)
cluster.Proxy.SetDataCoordClient(cluster.DataCoord)
//cluster.proxy.SetIndexCoordClient(cluster.indexCoord)
cluster.proxy.SetQueryCoordClient(cluster.queryCoord)
cluster.proxy.SetRootCoordClient(cluster.rootCoord)
cluster.Proxy.SetQueryCoordClient(cluster.QueryCoord)
cluster.Proxy.SetRootCoordClient(cluster.RootCoord)
return cluster, nil
}
@ -316,41 +316,41 @@ func (cluster *MiniCluster) GetContext() context.Context {
func (cluster *MiniCluster) Start() error {
log.Info("mini cluster start")
err := cluster.rootCoord.Init()
err := cluster.RootCoord.Init()
if err != nil {
return err
}
err = cluster.rootCoord.Start()
err = cluster.RootCoord.Start()
if err != nil {
return err
}
err = cluster.rootCoord.Register()
err = cluster.RootCoord.Register()
if err != nil {
return err
}
err = cluster.dataCoord.Init()
err = cluster.DataCoord.Init()
if err != nil {
return err
}
err = cluster.dataCoord.Start()
err = cluster.DataCoord.Start()
if err != nil {
return err
}
err = cluster.dataCoord.Register()
err = cluster.DataCoord.Register()
if err != nil {
return err
}
err = cluster.queryCoord.Init()
err = cluster.QueryCoord.Init()
if err != nil {
return err
}
err = cluster.queryCoord.Start()
err = cluster.QueryCoord.Start()
if err != nil {
return err
}
err = cluster.queryCoord.Register()
err = cluster.QueryCoord.Register()
if err != nil {
return err
}
@ -368,7 +368,7 @@ func (cluster *MiniCluster) Start() error {
// return err
//}
for _, dataNode := range cluster.dataNodes {
for _, dataNode := range cluster.DataNodes {
err = dataNode.Init()
if err != nil {
return err
@ -383,7 +383,7 @@ func (cluster *MiniCluster) Start() error {
}
}
for _, queryNode := range cluster.queryNodes {
for _, queryNode := range cluster.QueryNodes {
err = queryNode.Init()
if err != nil {
return err
@ -398,7 +398,7 @@ func (cluster *MiniCluster) Start() error {
}
}
for _, indexNode := range cluster.indexNodes {
for _, indexNode := range cluster.IndexNodes {
err = indexNode.Init()
if err != nil {
return err
@ -413,15 +413,15 @@ func (cluster *MiniCluster) Start() error {
}
}
err = cluster.proxy.Init()
err = cluster.Proxy.Init()
if err != nil {
return err
}
err = cluster.proxy.Start()
err = cluster.Proxy.Start()
if err != nil {
return err
}
err = cluster.proxy.Register()
err = cluster.Proxy.Register()
if err != nil {
return err
}
@ -431,43 +431,43 @@ func (cluster *MiniCluster) Start() error {
func (cluster *MiniCluster) Stop() error {
log.Info("mini cluster stop")
cluster.rootCoord.Stop()
cluster.RootCoord.Stop()
log.Info("mini cluster rootCoord stopped")
cluster.dataCoord.Stop()
cluster.DataCoord.Stop()
log.Info("mini cluster dataCoord stopped")
//cluster.indexCoord.Stop()
cluster.queryCoord.Stop()
cluster.QueryCoord.Stop()
log.Info("mini cluster queryCoord stopped")
cluster.proxy.Stop()
cluster.Proxy.Stop()
log.Info("mini cluster proxy stopped")
for _, dataNode := range cluster.dataNodes {
for _, dataNode := range cluster.DataNodes {
dataNode.Stop()
}
log.Info("mini cluster datanodes stopped")
for _, queryNode := range cluster.queryNodes {
for _, queryNode := range cluster.QueryNodes {
queryNode.Stop()
}
log.Info("mini cluster querynodes stopped")
for _, indexNode := range cluster.indexNodes {
for _, indexNode := range cluster.IndexNodes {
indexNode.Stop()
}
log.Info("mini cluster indexnodes stopped")
cluster.etcdCli.KV.Delete(cluster.ctx, params.EtcdCfg.RootPath.GetValue(), clientv3.WithPrefix())
defer cluster.etcdCli.Close()
cluster.EtcdCli.KV.Delete(cluster.ctx, params.EtcdCfg.RootPath.GetValue(), clientv3.WithPrefix())
defer cluster.EtcdCli.Close()
if cluster.chunkManager == nil {
if cluster.ChunkManager == nil {
chunkManager, err := cluster.factory.NewPersistentStorageChunkManager(cluster.ctx)
if err != nil {
log.Warn("fail to create chunk manager to clean test data", zap.Error(err))
} else {
cluster.chunkManager = chunkManager
cluster.ChunkManager = chunkManager
}
}
cluster.chunkManager.RemoveWithPrefix(cluster.ctx, cluster.chunkManager.RootPath())
cluster.ChunkManager.RemoveWithPrefix(cluster.ctx, cluster.ChunkManager.RootPath())
return nil
}
@ -506,7 +506,7 @@ func WithClusterSize(clusterConfig ClusterConfig) Option {
func WithEtcdClient(etcdCli *clientv3.Client) Option {
return func(cluster *MiniCluster) {
cluster.etcdCli = etcdCli
cluster.EtcdCli = etcdCli
}
}
@ -518,19 +518,19 @@ func WithFactory(factory dependency.Factory) Option {
func WithRootCoord(rootCoord types.RootCoordComponent) Option {
return func(cluster *MiniCluster) {
cluster.rootCoord = rootCoord
cluster.RootCoord = rootCoord
}
}
func WithDataCoord(dataCoord types.DataCoordComponent) Option {
return func(cluster *MiniCluster) {
cluster.dataCoord = dataCoord
cluster.DataCoord = dataCoord
}
}
func WithQueryCoord(queryCoord types.QueryCoordComponent) Option {
return func(cluster *MiniCluster) {
cluster.queryCoord = queryCoord
cluster.QueryCoord = queryCoord
}
}
@ -542,25 +542,25 @@ func WithQueryCoord(queryCoord types.QueryCoordComponent) Option {
func WithDataNodes(datanodes []types.DataNodeComponent) Option {
return func(cluster *MiniCluster) {
cluster.dataNodes = datanodes
cluster.DataNodes = datanodes
}
}
func WithQueryNodes(queryNodes []types.QueryNodeComponent) Option {
return func(cluster *MiniCluster) {
cluster.queryNodes = queryNodes
cluster.QueryNodes = queryNodes
}
}
func WithIndexNodes(indexNodes []types.IndexNodeComponent) Option {
return func(cluster *MiniCluster) {
cluster.indexNodes = indexNodes
cluster.IndexNodes = indexNodes
}
}
func WithProxy(proxy types.ProxyComponent) Option {
return func(cluster *MiniCluster) {
cluster.proxy = proxy
cluster.Proxy = proxy
}
}
@ -572,7 +572,7 @@ func (cluster *MiniCluster) CreateDefaultRootCoord() (types.RootCoordComponent,
port := funcutil.GetAvailablePort()
rootCoord.SetAddress(funcutil.GetLocalIP() + ":" + fmt.Sprint(port))
rootCoord.SetProxyCreator(cluster.GetProxy)
rootCoord.SetEtcdClient(cluster.etcdCli)
rootCoord.SetEtcdClient(cluster.EtcdCli)
return rootCoord, nil
}
@ -582,7 +582,7 @@ func (cluster *MiniCluster) CreateDefaultDataCoord() (types.DataCoordComponent,
dataCoord.SetAddress(funcutil.GetLocalIP() + ":" + fmt.Sprint(port))
dataCoord.SetDataNodeCreator(cluster.GetDataNode)
dataCoord.SetIndexNodeCreator(cluster.GetIndexNode)
dataCoord.SetEtcdClient(cluster.etcdCli)
dataCoord.SetEtcdClient(cluster.EtcdCli)
return dataCoord, nil
}
@ -594,7 +594,7 @@ func (cluster *MiniCluster) CreateDefaultQueryCoord() (types.QueryCoordComponent
port := funcutil.GetAvailablePort()
queryCoord.SetAddress(funcutil.GetLocalIP() + ":" + fmt.Sprint(port))
queryCoord.SetQueryNodeCreator(cluster.GetQueryNode)
queryCoord.SetEtcdClient(cluster.etcdCli)
queryCoord.SetEtcdClient(cluster.EtcdCli)
return queryCoord, nil
}
@ -613,7 +613,7 @@ func (cluster *MiniCluster) CreateDefaultQueryCoord() (types.QueryCoordComponent
func (cluster *MiniCluster) CreateDefaultDataNode() (types.DataNodeComponent, error) {
log.Debug("mini cluster CreateDefaultDataNode")
dataNode := datanode.NewDataNode(cluster.ctx, cluster.factory)
dataNode.SetEtcdClient(cluster.etcdCli)
dataNode.SetEtcdClient(cluster.EtcdCli)
port := funcutil.GetAvailablePort()
dataNode.SetAddress(funcutil.GetLocalIP() + ":" + fmt.Sprint(port))
return dataNode, nil
@ -622,7 +622,7 @@ func (cluster *MiniCluster) CreateDefaultDataNode() (types.DataNodeComponent, er
func (cluster *MiniCluster) CreateDefaultQueryNode() (types.QueryNodeComponent, error) {
log.Debug("mini cluster CreateDefaultQueryNode")
queryNode := querynodev2.NewQueryNode(cluster.ctx, cluster.factory)
queryNode.SetEtcdClient(cluster.etcdCli)
queryNode.SetEtcdClient(cluster.EtcdCli)
port := funcutil.GetAvailablePort()
queryNode.SetAddress(funcutil.GetLocalIP() + ":" + fmt.Sprint(port))
return queryNode, nil
@ -631,7 +631,7 @@ func (cluster *MiniCluster) CreateDefaultQueryNode() (types.QueryNodeComponent,
func (cluster *MiniCluster) CreateDefaultIndexNode() (types.IndexNodeComponent, error) {
log.Debug("mini cluster CreateDefaultIndexNode")
indexNode := indexnode.NewIndexNode(cluster.ctx, cluster.factory)
indexNode.SetEtcdClient(cluster.etcdCli)
indexNode.SetEtcdClient(cluster.EtcdCli)
port := funcutil.GetAvailablePort()
indexNode.SetAddress(funcutil.GetLocalIP() + ":" + fmt.Sprint(port))
return indexNode, nil
@ -640,7 +640,7 @@ func (cluster *MiniCluster) CreateDefaultIndexNode() (types.IndexNodeComponent,
func (cluster *MiniCluster) CreateDefaultProxy() (types.ProxyComponent, error) {
log.Debug("mini cluster CreateDefaultProxy")
proxy, err := proxy2.NewProxy(cluster.ctx, cluster.factory)
proxy.SetEtcdClient(cluster.etcdCli)
proxy.SetEtcdClient(cluster.EtcdCli)
if err != nil {
return nil, err
}
@ -657,7 +657,7 @@ func (cluster *MiniCluster) AddRootCoord(rootCoord types.RootCoordComponent) err
cluster.mu.Lock()
defer cluster.mu.Unlock()
var err error
if cluster.rootCoord != nil {
if cluster.RootCoord != nil {
return errors.New("rootCoord already exist, maybe you need to remove it first")
}
if rootCoord == nil {
@ -668,14 +668,14 @@ func (cluster *MiniCluster) AddRootCoord(rootCoord types.RootCoordComponent) err
}
// link
rootCoord.SetDataCoord(cluster.dataCoord)
rootCoord.SetQueryCoord(cluster.queryCoord)
rootCoord.SetDataCoord(cluster.DataCoord)
rootCoord.SetQueryCoord(cluster.QueryCoord)
//rootCoord.SetIndexCoord(cluster.indexCoord)
cluster.dataCoord.SetRootCoord(rootCoord)
cluster.queryCoord.SetRootCoord(rootCoord)
cluster.DataCoord.SetRootCoord(rootCoord)
cluster.QueryCoord.SetRootCoord(rootCoord)
//cluster.indexCoord.SetRootCoord(rootCoord)
cluster.proxy.SetRootCoordClient(rootCoord)
for _, dataNode := range cluster.dataNodes {
cluster.Proxy.SetRootCoordClient(rootCoord)
for _, dataNode := range cluster.DataNodes {
err = dataNode.SetRootCoord(rootCoord)
if err != nil {
return err
@ -696,7 +696,7 @@ func (cluster *MiniCluster) AddRootCoord(rootCoord types.RootCoordComponent) err
return err
}
cluster.rootCoord = rootCoord
cluster.RootCoord = rootCoord
log.Debug("mini cluster AddRootCoord succeed")
return nil
}
@ -707,13 +707,13 @@ func (cluster *MiniCluster) RemoveRootCoord(rootCoord types.RootCoordComponent)
cluster.mu.Lock()
defer cluster.mu.Unlock()
if cluster.rootCoord == nil {
if cluster.RootCoord == nil {
log.Info("mini cluster has no rootCoord, no need to remove")
return nil
}
cluster.rootCoord.Stop()
cluster.rootCoord = nil
cluster.RootCoord.Stop()
cluster.RootCoord = nil
log.Debug("mini cluster RemoveRootCoord succeed")
return nil
}
@ -725,7 +725,7 @@ func (cluster *MiniCluster) AddDataCoord(dataCoord types.DataCoordComponent) err
cluster.mu.Lock()
defer cluster.mu.Unlock()
var err error
if cluster.dataCoord != nil {
if cluster.DataCoord != nil {
return errors.New("dataCoord already exist, maybe you need to remove it first")
}
if dataCoord == nil {
@ -737,12 +737,12 @@ func (cluster *MiniCluster) AddDataCoord(dataCoord types.DataCoordComponent) err
// link
//dataCoord.SetIndexCoord(cluster.indexCoord)
dataCoord.SetRootCoord(cluster.rootCoord)
err = cluster.rootCoord.SetDataCoord(cluster.dataCoord)
dataCoord.SetRootCoord(cluster.RootCoord)
err = cluster.RootCoord.SetDataCoord(cluster.DataCoord)
if err != nil {
return err
}
err = cluster.queryCoord.SetDataCoord(cluster.dataCoord)
err = cluster.QueryCoord.SetDataCoord(cluster.DataCoord)
if err != nil {
return err
}
@ -750,8 +750,8 @@ func (cluster *MiniCluster) AddDataCoord(dataCoord types.DataCoordComponent) err
//if err != nil {
// return err
//}
cluster.proxy.SetDataCoordClient(dataCoord)
for _, dataNode := range cluster.dataNodes {
cluster.Proxy.SetDataCoordClient(dataCoord)
for _, dataNode := range cluster.DataNodes {
err = dataNode.SetDataCoord(dataCoord)
if err != nil {
return err
@ -772,7 +772,7 @@ func (cluster *MiniCluster) AddDataCoord(dataCoord types.DataCoordComponent) err
return err
}
cluster.dataCoord = dataCoord
cluster.DataCoord = dataCoord
log.Debug("mini cluster AddDataCoord succeed")
return nil
}
@ -783,13 +783,13 @@ func (cluster *MiniCluster) RemoveDataCoord(dataCoord types.DataCoordComponent)
cluster.mu.Lock()
defer cluster.mu.Unlock()
if cluster.dataCoord == nil {
if cluster.DataCoord == nil {
log.Info("mini cluster has no dataCoord, no need to remove")
return nil
}
cluster.dataCoord.Stop()
cluster.dataCoord = nil
cluster.DataCoord.Stop()
cluster.DataCoord = nil
log.Debug("mini cluster RemoveDataCoord succeed")
return nil
}
@ -801,7 +801,7 @@ func (cluster *MiniCluster) AddQueryCoord(queryCoord types.QueryCoordComponent)
cluster.mu.Lock()
defer cluster.mu.Unlock()
var err error
if cluster.queryCoord != nil {
if cluster.QueryCoord != nil {
return errors.New("queryCoord already exist, maybe you need to remove it first")
}
if queryCoord == nil {
@ -812,11 +812,11 @@ func (cluster *MiniCluster) AddQueryCoord(queryCoord types.QueryCoordComponent)
}
// link
queryCoord.SetRootCoord(cluster.rootCoord)
queryCoord.SetDataCoord(cluster.dataCoord)
queryCoord.SetRootCoord(cluster.RootCoord)
queryCoord.SetDataCoord(cluster.DataCoord)
//queryCoord.SetIndexCoord(cluster.indexCoord)
cluster.rootCoord.SetQueryCoord(queryCoord)
cluster.proxy.SetQueryCoordClient(queryCoord)
cluster.RootCoord.SetQueryCoord(queryCoord)
cluster.Proxy.SetQueryCoordClient(queryCoord)
// start
err = queryCoord.Init()
@ -832,7 +832,7 @@ func (cluster *MiniCluster) AddQueryCoord(queryCoord types.QueryCoordComponent)
return err
}
cluster.queryCoord = queryCoord
cluster.QueryCoord = queryCoord
log.Debug("mini cluster AddQueryCoord succeed")
return nil
}
@ -843,13 +843,13 @@ func (cluster *MiniCluster) RemoveQueryCoord(queryCoord types.QueryCoordComponen
cluster.mu.Lock()
defer cluster.mu.Unlock()
if cluster.queryCoord == nil {
if cluster.QueryCoord == nil {
log.Info("mini cluster has no queryCoord, no need to remove")
return nil
}
cluster.queryCoord.Stop()
cluster.queryCoord = nil
cluster.QueryCoord.Stop()
cluster.QueryCoord = nil
log.Debug("mini cluster RemoveQueryCoord succeed")
return nil
}
@ -928,11 +928,11 @@ func (cluster *MiniCluster) AddDataNode(dataNode types.DataNodeComponent) error
return err
}
}
err = dataNode.SetDataCoord(cluster.dataCoord)
err = dataNode.SetDataCoord(cluster.DataCoord)
if err != nil {
return err
}
err = dataNode.SetRootCoord(cluster.rootCoord)
err = dataNode.SetRootCoord(cluster.RootCoord)
if err != nil {
return err
}
@ -948,7 +948,7 @@ func (cluster *MiniCluster) AddDataNode(dataNode types.DataNodeComponent) error
if err != nil {
return err
}
cluster.dataNodes = append(cluster.dataNodes, dataNode)
cluster.DataNodes = append(cluster.DataNodes, dataNode)
cluster.clusterConfig.DataNodeNum = cluster.clusterConfig.DataNodeNum + 1
log.Debug("mini cluster AddDataNode succeed")
return nil
@ -962,9 +962,9 @@ func (cluster *MiniCluster) RemoveDataNode(dataNode types.DataNodeComponent) err
if dataNode == nil {
// choose a node randomly
if len(cluster.dataNodes) > 0 {
randIndex := rand.Intn(len(cluster.dataNodes))
dataNode = cluster.dataNodes[randIndex]
if len(cluster.DataNodes) > 0 {
randIndex := rand.Intn(len(cluster.DataNodes))
dataNode = cluster.DataNodes[randIndex]
} else {
log.Debug("mini cluster has no dataNodes")
return nil
@ -977,13 +977,13 @@ func (cluster *MiniCluster) RemoveDataNode(dataNode types.DataNodeComponent) err
}
newDataNodes := make([]types.DataNodeComponent, 0)
for _, dn := range cluster.dataNodes {
for _, dn := range cluster.DataNodes {
if dn == dataNode {
continue
}
newDataNodes = append(newDataNodes, dn)
}
cluster.dataNodes = newDataNodes
cluster.DataNodes = newDataNodes
cluster.clusterConfig.DataNodeNum = cluster.clusterConfig.DataNodeNum - 1
log.Debug("mini cluster RemoveDataNode succeed")
return nil
@ -1014,7 +1014,7 @@ func (cluster *MiniCluster) AddQueryNode(queryNode types.QueryNodeComponent) err
if err != nil {
return err
}
cluster.queryNodes = append(cluster.queryNodes, queryNode)
cluster.QueryNodes = append(cluster.QueryNodes, queryNode)
cluster.clusterConfig.QueryNodeNum = cluster.clusterConfig.QueryNodeNum + 1
log.Debug("mini cluster AddQueryNode succeed")
return nil
@ -1028,9 +1028,9 @@ func (cluster *MiniCluster) RemoveQueryNode(queryNode types.QueryNodeComponent)
if queryNode == nil {
// choose a node randomly
if len(cluster.queryNodes) > 0 {
randIndex := rand.Intn(len(cluster.queryNodes))
queryNode = cluster.queryNodes[randIndex]
if len(cluster.QueryNodes) > 0 {
randIndex := rand.Intn(len(cluster.QueryNodes))
queryNode = cluster.QueryNodes[randIndex]
} else {
log.Debug("mini cluster has no queryNodes")
return nil
@ -1043,13 +1043,13 @@ func (cluster *MiniCluster) RemoveQueryNode(queryNode types.QueryNodeComponent)
}
newQueryNodes := make([]types.QueryNodeComponent, 0)
for _, qn := range cluster.queryNodes {
for _, qn := range cluster.QueryNodes {
if qn == queryNode {
continue
}
newQueryNodes = append(newQueryNodes, qn)
}
cluster.queryNodes = newQueryNodes
cluster.QueryNodes = newQueryNodes
cluster.clusterConfig.QueryNodeNum = cluster.clusterConfig.QueryNodeNum - 1
log.Debug("mini cluster RemoveQueryNode succeed")
return nil
@ -1080,7 +1080,7 @@ func (cluster *MiniCluster) AddIndexNode(indexNode types.IndexNodeComponent) err
if err != nil {
return err
}
cluster.indexNodes = append(cluster.indexNodes, indexNode)
cluster.IndexNodes = append(cluster.IndexNodes, indexNode)
cluster.clusterConfig.IndexNodeNum = cluster.clusterConfig.IndexNodeNum + 1
log.Debug("mini cluster AddIndexNode succeed")
return nil
@ -1094,9 +1094,9 @@ func (cluster *MiniCluster) RemoveIndexNode(indexNode types.IndexNodeComponent)
if indexNode == nil {
// choose a node randomly
if len(cluster.indexNodes) > 0 {
randIndex := rand.Intn(len(cluster.indexNodes))
indexNode = cluster.indexNodes[randIndex]
if len(cluster.IndexNodes) > 0 {
randIndex := rand.Intn(len(cluster.IndexNodes))
indexNode = cluster.IndexNodes[randIndex]
} else {
log.Debug("mini cluster has no queryNodes")
return nil
@ -1109,13 +1109,13 @@ func (cluster *MiniCluster) RemoveIndexNode(indexNode types.IndexNodeComponent)
}
newIndexNodes := make([]types.IndexNodeComponent, 0)
for _, in := range cluster.indexNodes {
for _, in := range cluster.IndexNodes {
if in == indexNode {
continue
}
newIndexNodes = append(newIndexNodes, in)
}
cluster.indexNodes = newIndexNodes
cluster.IndexNodes = newIndexNodes
cluster.clusterConfig.IndexNodeNum = cluster.clusterConfig.IndexNodeNum - 1
log.Debug("mini cluster RemoveIndexNode succeed")
return nil
@ -1129,46 +1129,46 @@ func (cluster *MiniCluster) UpdateClusterSize(clusterConfig ClusterConfig) error
// todo concurrent concerns
//cluster.mu.Lock()
//defer cluster.mu.Unlock()
if clusterConfig.DataNodeNum > len(cluster.dataNodes) {
needAdd := clusterConfig.DataNodeNum - len(cluster.dataNodes)
if clusterConfig.DataNodeNum > len(cluster.DataNodes) {
needAdd := clusterConfig.DataNodeNum - len(cluster.DataNodes)
for i := 0; i < needAdd; i++ {
cluster.AddDataNode(nil)
}
} else if clusterConfig.DataNodeNum < len(cluster.dataNodes) {
needRemove := len(cluster.dataNodes) - clusterConfig.DataNodeNum
} else if clusterConfig.DataNodeNum < len(cluster.DataNodes) {
needRemove := len(cluster.DataNodes) - clusterConfig.DataNodeNum
for i := 0; i < needRemove; i++ {
cluster.RemoveDataNode(nil)
}
}
if clusterConfig.QueryNodeNum > len(cluster.queryNodes) {
needAdd := clusterConfig.QueryNodeNum - len(cluster.queryNodes)
if clusterConfig.QueryNodeNum > len(cluster.QueryNodes) {
needAdd := clusterConfig.QueryNodeNum - len(cluster.QueryNodes)
for i := 0; i < needAdd; i++ {
cluster.AddQueryNode(nil)
}
} else if clusterConfig.QueryNodeNum < len(cluster.queryNodes) {
needRemove := len(cluster.queryNodes) - clusterConfig.QueryNodeNum
} else if clusterConfig.QueryNodeNum < len(cluster.QueryNodes) {
needRemove := len(cluster.QueryNodes) - clusterConfig.QueryNodeNum
for i := 0; i < needRemove; i++ {
cluster.RemoveQueryNode(nil)
}
}
if clusterConfig.IndexNodeNum > len(cluster.indexNodes) {
needAdd := clusterConfig.IndexNodeNum - len(cluster.indexNodes)
if clusterConfig.IndexNodeNum > len(cluster.IndexNodes) {
needAdd := clusterConfig.IndexNodeNum - len(cluster.IndexNodes)
for i := 0; i < needAdd; i++ {
cluster.AddIndexNode(nil)
}
} else if clusterConfig.IndexNodeNum < len(cluster.indexNodes) {
needRemove := len(cluster.indexNodes) - clusterConfig.IndexNodeNum
} else if clusterConfig.IndexNodeNum < len(cluster.IndexNodes) {
needRemove := len(cluster.IndexNodes) - clusterConfig.IndexNodeNum
for i := 0; i < needRemove; i++ {
cluster.RemoveIndexNode(nil)
}
}
// validate
if clusterConfig.DataNodeNum != len(cluster.dataNodes) ||
clusterConfig.QueryNodeNum != len(cluster.queryNodes) ||
clusterConfig.IndexNodeNum != len(cluster.indexNodes) {
if clusterConfig.DataNodeNum != len(cluster.DataNodes) ||
clusterConfig.QueryNodeNum != len(cluster.QueryNodes) ||
clusterConfig.IndexNodeNum != len(cluster.IndexNodes) {
return errors.New("Fail to update cluster size to target size")
}
@ -1177,14 +1177,14 @@ func (cluster *MiniCluster) UpdateClusterSize(clusterConfig ClusterConfig) error
}
func (cluster *MiniCluster) GetProxy(ctx context.Context, addr string) (types.Proxy, error) {
if cluster.proxy.GetAddress() == addr {
return cluster.proxy, nil
if cluster.Proxy.GetAddress() == addr {
return cluster.Proxy, nil
}
return nil, nil
}
func (cluster *MiniCluster) GetQueryNode(ctx context.Context, addr string) (types.QueryNode, error) {
for _, queryNode := range cluster.queryNodes {
for _, queryNode := range cluster.QueryNodes {
if queryNode.GetAddress() == addr {
return queryNode, nil
}
@ -1193,7 +1193,7 @@ func (cluster *MiniCluster) GetQueryNode(ctx context.Context, addr string) (type
}
func (cluster *MiniCluster) GetDataNode(ctx context.Context, addr string) (types.DataNode, error) {
for _, dataNode := range cluster.dataNodes {
for _, dataNode := range cluster.DataNodes {
if dataNode.GetAddress() == addr {
return dataNode, nil
}
@ -1202,7 +1202,7 @@ func (cluster *MiniCluster) GetDataNode(ctx context.Context, addr string) (types
}
func (cluster *MiniCluster) GetIndexNode(ctx context.Context, addr string) (types.IndexNode, error) {
for _, indexNode := range cluster.indexNodes {
for _, indexNode := range cluster.IndexNodes {
if indexNode.GetAddress() == addr {
return indexNode, nil
}
@ -1211,5 +1211,5 @@ func (cluster *MiniCluster) GetIndexNode(ctx context.Context, addr string) (type
}
func (cluster *MiniCluster) GetMetaWatcher() MetaWatcher {
return cluster.metaWatcher
return cluster.MetaWatcher
}

View File

@ -41,33 +41,33 @@ func (s *MiniClusterMethodsSuite) TestRemoveDataNode() {
defer cancel()
datanode := datanode.NewDataNode(ctx, c.factory)
datanode.SetEtcdClient(c.etcdCli)
datanode.SetEtcdClient(c.EtcdCli)
//datanode := c.CreateDefaultDataNode()
err := c.AddDataNode(datanode)
s.NoError(err)
s.Equal(2, c.clusterConfig.DataNodeNum)
s.Equal(2, len(c.dataNodes))
s.Equal(2, len(c.DataNodes))
err = c.RemoveDataNode(datanode)
s.NoError(err)
s.Equal(1, c.clusterConfig.DataNodeNum)
s.Equal(1, len(c.dataNodes))
s.Equal(1, len(c.DataNodes))
// add default node and remove randomly
err = c.AddDataNode(nil)
s.NoError(err)
s.Equal(2, c.clusterConfig.DataNodeNum)
s.Equal(2, len(c.dataNodes))
s.Equal(2, len(c.DataNodes))
err = c.RemoveDataNode(nil)
s.NoError(err)
s.Equal(1, c.clusterConfig.DataNodeNum)
s.Equal(1, len(c.dataNodes))
s.Equal(1, len(c.DataNodes))
}
func (s *MiniClusterMethodsSuite) TestRemoveQueryNode() {
@ -76,33 +76,33 @@ func (s *MiniClusterMethodsSuite) TestRemoveQueryNode() {
defer cancel()
queryNode := querynodev2.NewQueryNode(ctx, c.factory)
queryNode.SetEtcdClient(c.etcdCli)
queryNode.SetEtcdClient(c.EtcdCli)
//queryNode := c.CreateDefaultQueryNode()
err := c.AddQueryNode(queryNode)
s.NoError(err)
s.Equal(2, c.clusterConfig.QueryNodeNum)
s.Equal(2, len(c.queryNodes))
s.Equal(2, len(c.QueryNodes))
err = c.RemoveQueryNode(queryNode)
s.NoError(err)
s.Equal(1, c.clusterConfig.QueryNodeNum)
s.Equal(1, len(c.queryNodes))
s.Equal(1, len(c.QueryNodes))
// add default node and remove randomly
err = c.AddQueryNode(nil)
s.NoError(err)
s.Equal(2, c.clusterConfig.QueryNodeNum)
s.Equal(2, len(c.queryNodes))
s.Equal(2, len(c.QueryNodes))
err = c.RemoveQueryNode(nil)
s.NoError(err)
s.Equal(1, c.clusterConfig.QueryNodeNum)
s.Equal(1, len(c.queryNodes))
s.Equal(1, len(c.QueryNodes))
}
@ -112,33 +112,33 @@ func (s *MiniClusterMethodsSuite) TestRemoveIndexNode() {
defer cancel()
indexNode := indexnode.NewIndexNode(ctx, c.factory)
indexNode.SetEtcdClient(c.etcdCli)
indexNode.SetEtcdClient(c.EtcdCli)
//indexNode := c.CreateDefaultIndexNode()
err := c.AddIndexNode(indexNode)
s.NoError(err)
s.Equal(2, c.clusterConfig.IndexNodeNum)
s.Equal(2, len(c.indexNodes))
s.Equal(2, len(c.IndexNodes))
err = c.RemoveIndexNode(indexNode)
s.NoError(err)
s.Equal(1, c.clusterConfig.IndexNodeNum)
s.Equal(1, len(c.indexNodes))
s.Equal(1, len(c.IndexNodes))
// add default node and remove randomly
err = c.AddIndexNode(nil)
s.NoError(err)
s.Equal(2, c.clusterConfig.IndexNodeNum)
s.Equal(2, len(c.indexNodes))
s.Equal(2, len(c.IndexNodes))
err = c.RemoveIndexNode(nil)
s.NoError(err)
s.Equal(1, c.clusterConfig.IndexNodeNum)
s.Equal(1, len(c.indexNodes))
s.Equal(1, len(c.IndexNodes))
}
@ -164,9 +164,9 @@ func (s *MiniClusterMethodsSuite) TestUpdateClusterSize() {
s.Equal(2, c.clusterConfig.QueryNodeNum)
s.Equal(2, c.clusterConfig.IndexNodeNum)
s.Equal(2, len(c.dataNodes))
s.Equal(2, len(c.queryNodes))
s.Equal(2, len(c.indexNodes))
s.Equal(2, len(c.DataNodes))
s.Equal(2, len(c.QueryNodes))
s.Equal(2, len(c.IndexNodes))
err = c.UpdateClusterSize(ClusterConfig{
DataNodeNum: 3,
@ -179,9 +179,9 @@ func (s *MiniClusterMethodsSuite) TestUpdateClusterSize() {
s.Equal(2, c.clusterConfig.QueryNodeNum)
s.Equal(1, c.clusterConfig.IndexNodeNum)
s.Equal(3, len(c.dataNodes))
s.Equal(2, len(c.queryNodes))
s.Equal(1, len(c.indexNodes))
s.Equal(3, len(c.DataNodes))
s.Equal(2, len(c.QueryNodes))
s.Equal(1, len(c.IndexNodes))
}
func TestMiniCluster(t *testing.T) {

View File

@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package integration
package rangesearch
import (
"context"
@ -32,10 +32,11 @@ import (
"github.com/milvus-io/milvus/pkg/util/distance"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/tests/integration"
)
type RangeSearchSuite struct {
MiniClusterSuite
integration.MiniClusterSuite
}
func (s *RangeSearchSuite) TestRangeSearchIP() {
@ -49,11 +50,11 @@ func (s *RangeSearchSuite) TestRangeSearchIP() {
dim := 128
rowNum := 3000
schema := constructSchema(collectionName, dim, true)
schema := integration.ConstructSchema(collectionName, dim, true)
marshaledSchema, err := proto.Marshal(schema)
s.NoError(err)
createCollectionStatus, err := c.proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
DbName: dbName,
CollectionName: collectionName,
Schema: marshaledSchema,
@ -67,14 +68,14 @@ func (s *RangeSearchSuite) TestRangeSearchIP() {
}
log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus))
showCollectionsResp, err := c.proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{})
showCollectionsResp, err := c.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{})
s.NoError(err)
s.True(merr.Ok(showCollectionsResp.GetStatus()))
log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp))
fVecColumn := newFloatVectorFieldData(floatVecField, rowNum, dim)
hashKeys := generateHashKeys(rowNum)
insertResult, err := c.proxy.Insert(ctx, &milvuspb.InsertRequest{
fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, rowNum, dim)
hashKeys := integration.GenerateHashKeys(rowNum)
insertResult, err := c.Proxy.Insert(ctx, &milvuspb.InsertRequest{
DbName: dbName,
CollectionName: collectionName,
FieldsData: []*schemapb.FieldData{fVecColumn},
@ -85,7 +86,7 @@ func (s *RangeSearchSuite) TestRangeSearchIP() {
s.True(merr.Ok(insertResult.GetStatus()))
// flush
flushResp, err := c.proxy.Flush(ctx, &milvuspb.FlushRequest{
flushResp, err := c.Proxy.Flush(ctx, &milvuspb.FlushRequest{
DbName: dbName,
CollectionNames: []string{collectionName},
})
@ -95,30 +96,30 @@ func (s *RangeSearchSuite) TestRangeSearchIP() {
ids := segmentIDs.GetData()
s.NotEmpty(segmentIDs)
segments, err := c.metaWatcher.ShowSegments()
segments, err := c.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
for _, segment := range segments {
log.Info("ShowSegments result", zap.String("segment", segment.String()))
}
waitingForFlush(ctx, c, ids)
s.WaitForFlush(ctx, ids)
// create index
createIndexStatus, err := c.proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
CollectionName: collectionName,
FieldName: floatVecField,
FieldName: integration.FloatVecField,
IndexName: "_default",
ExtraParams: constructIndexParam(dim, IndexFaissIvfFlat, distance.IP),
ExtraParams: integration.ConstructIndexParam(dim, integration.IndexFaissIvfFlat, distance.IP),
})
s.NoError(err)
err = merr.Error(createIndexStatus)
if err != nil {
log.Warn("createIndexStatus fail reason", zap.Error(err))
}
waitingForIndexBuilt(ctx, c, s.T(), collectionName, floatVecField)
s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField)
// load
loadStatus, err := c.proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
DbName: dbName,
CollectionName: collectionName,
})
@ -127,23 +128,23 @@ func (s *RangeSearchSuite) TestRangeSearchIP() {
if err != nil {
log.Warn("LoadCollection fail reason", zap.Error(err))
}
waitingForLoad(ctx, c, collectionName)
s.WaitForLoad(ctx, collectionName)
// search
expr := fmt.Sprintf("%s > 0", int64Field)
expr := fmt.Sprintf("%s > 0", integration.Int64Field)
nq := 10
topk := 10
roundDecimal := -1
radius := 10
filter := 20
params := getSearchParams(IndexFaissIvfFlat, distance.IP)
params := integration.GetSearchParams(integration.IndexFaissIvfFlat, distance.IP)
// only pass in radius when range search
params["radius"] = radius
searchReq := constructSearchRequest("", collectionName, expr,
floatVecField, schemapb.DataType_FloatVector, nil, distance.IP, params, nq, dim, topk, roundDecimal)
searchReq := integration.ConstructSearchRequest("", collectionName, expr,
integration.FloatVecField, schemapb.DataType_FloatVector, nil, distance.IP, params, nq, dim, topk, roundDecimal)
searchResult, _ := c.proxy.Search(ctx, searchReq)
searchResult, _ := c.Proxy.Search(ctx, searchReq)
err = merr.Error(searchResult.GetStatus())
if err != nil {
@ -153,10 +154,10 @@ func (s *RangeSearchSuite) TestRangeSearchIP() {
// pass in radius and range_filter when range search
params["range_filter"] = filter
searchReq = constructSearchRequest("", collectionName, expr,
floatVecField, schemapb.DataType_FloatVector, nil, distance.IP, params, nq, dim, topk, roundDecimal)
searchReq = integration.ConstructSearchRequest("", collectionName, expr,
integration.FloatVecField, schemapb.DataType_FloatVector, nil, distance.IP, params, nq, dim, topk, roundDecimal)
searchResult, _ = c.proxy.Search(ctx, searchReq)
searchResult, _ = c.Proxy.Search(ctx, searchReq)
err = merr.Error(searchResult.GetStatus())
if err != nil {
@ -167,10 +168,10 @@ func (s *RangeSearchSuite) TestRangeSearchIP() {
// pass in illegal radius and range_filter when range search
params["radius"] = filter
params["range_filter"] = radius
searchReq = constructSearchRequest("", collectionName, expr,
floatVecField, schemapb.DataType_FloatVector, nil, distance.IP, params, nq, dim, topk, roundDecimal)
searchReq = integration.ConstructSearchRequest("", collectionName, expr,
integration.FloatVecField, schemapb.DataType_FloatVector, nil, distance.IP, params, nq, dim, topk, roundDecimal)
searchResult, _ = c.proxy.Search(ctx, searchReq)
searchResult, _ = c.Proxy.Search(ctx, searchReq)
err = merr.Error(searchResult.GetStatus())
if err != nil {
@ -197,11 +198,11 @@ func (s *RangeSearchSuite) TestRangeSearchL2() {
dim := 128
rowNum := 3000
schema := constructSchema(collectionName, dim, true)
schema := integration.ConstructSchema(collectionName, dim, true)
marshaledSchema, err := proto.Marshal(schema)
s.NoError(err)
createCollectionStatus, err := c.proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
DbName: dbName,
CollectionName: collectionName,
Schema: marshaledSchema,
@ -215,14 +216,14 @@ func (s *RangeSearchSuite) TestRangeSearchL2() {
}
log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus))
showCollectionsResp, err := c.proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{})
showCollectionsResp, err := c.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{})
s.NoError(err)
s.True(merr.Ok(showCollectionsResp.GetStatus()))
log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp))
fVecColumn := newFloatVectorFieldData(floatVecField, rowNum, dim)
hashKeys := generateHashKeys(rowNum)
insertResult, err := c.proxy.Insert(ctx, &milvuspb.InsertRequest{
fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, rowNum, dim)
hashKeys := integration.GenerateHashKeys(rowNum)
insertResult, err := c.Proxy.Insert(ctx, &milvuspb.InsertRequest{
DbName: dbName,
CollectionName: collectionName,
FieldsData: []*schemapb.FieldData{fVecColumn},
@ -233,7 +234,7 @@ func (s *RangeSearchSuite) TestRangeSearchL2() {
s.True(merr.Ok(insertResult.GetStatus()))
// flush
flushResp, err := c.proxy.Flush(ctx, &milvuspb.FlushRequest{
flushResp, err := c.Proxy.Flush(ctx, &milvuspb.FlushRequest{
DbName: dbName,
CollectionNames: []string{collectionName},
})
@ -243,30 +244,30 @@ func (s *RangeSearchSuite) TestRangeSearchL2() {
ids := segmentIDs.GetData()
s.NotEmpty(segmentIDs)
segments, err := c.metaWatcher.ShowSegments()
segments, err := c.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
for _, segment := range segments {
log.Info("ShowSegments result", zap.String("segment", segment.String()))
}
waitingForFlush(ctx, c, ids)
s.WaitForFlush(ctx, ids)
// create index
createIndexStatus, err := c.proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
CollectionName: collectionName,
FieldName: floatVecField,
FieldName: integration.FloatVecField,
IndexName: "_default",
ExtraParams: constructIndexParam(dim, IndexFaissIvfFlat, distance.L2),
ExtraParams: integration.ConstructIndexParam(dim, integration.IndexFaissIvfFlat, distance.L2),
})
s.NoError(err)
err = merr.Error(createIndexStatus)
if err != nil {
log.Warn("createIndexStatus fail reason", zap.Error(err))
}
waitingForIndexBuilt(ctx, c, s.T(), collectionName, floatVecField)
s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField)
// load
loadStatus, err := c.proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
DbName: dbName,
CollectionName: collectionName,
})
@ -275,22 +276,22 @@ func (s *RangeSearchSuite) TestRangeSearchL2() {
if err != nil {
log.Warn("LoadCollection fail reason", zap.Error(err))
}
waitingForLoad(ctx, c, collectionName)
s.WaitForLoad(ctx, collectionName)
// search
expr := fmt.Sprintf("%s > 0", int64Field)
expr := fmt.Sprintf("%s > 0", integration.Int64Field)
nq := 10
topk := 10
roundDecimal := -1
radius := 20
filter := 10
params := getSearchParams(IndexFaissIvfFlat, distance.L2)
params := integration.GetSearchParams(integration.IndexFaissIvfFlat, distance.L2)
// only pass in radius when range search
params["radius"] = radius
searchReq := constructSearchRequest("", collectionName, expr,
floatVecField, schemapb.DataType_FloatVector, nil, distance.L2, params, nq, dim, topk, roundDecimal)
searchReq := integration.ConstructSearchRequest("", collectionName, expr,
integration.FloatVecField, schemapb.DataType_FloatVector, nil, distance.L2, params, nq, dim, topk, roundDecimal)
searchResult, _ := c.proxy.Search(ctx, searchReq)
searchResult, _ := c.Proxy.Search(ctx, searchReq)
err = merr.Error(searchResult.GetStatus())
if err != nil {
@ -300,10 +301,10 @@ func (s *RangeSearchSuite) TestRangeSearchL2() {
// pass in radius and range_filter when range search
params["range_filter"] = filter
searchReq = constructSearchRequest("", collectionName, expr,
floatVecField, schemapb.DataType_FloatVector, nil, distance.L2, params, nq, dim, topk, roundDecimal)
searchReq = integration.ConstructSearchRequest("", collectionName, expr,
integration.FloatVecField, schemapb.DataType_FloatVector, nil, distance.L2, params, nq, dim, topk, roundDecimal)
searchResult, _ = c.proxy.Search(ctx, searchReq)
searchResult, _ = c.Proxy.Search(ctx, searchReq)
err = merr.Error(searchResult.GetStatus())
if err != nil {
@ -314,10 +315,10 @@ func (s *RangeSearchSuite) TestRangeSearchL2() {
// pass in illegal radius and range_filter when range search
params["radius"] = filter
params["range_filter"] = radius
searchReq = constructSearchRequest("", collectionName, expr,
floatVecField, schemapb.DataType_FloatVector, nil, distance.L2, params, nq, dim, topk, roundDecimal)
searchReq = integration.ConstructSearchRequest("", collectionName, expr,
integration.FloatVecField, schemapb.DataType_FloatVector, nil, distance.L2, params, nq, dim, topk, roundDecimal)
searchResult, _ = c.proxy.Search(ctx, searchReq)
searchResult, _ = c.Proxy.Search(ctx, searchReq)
err = merr.Error(searchResult.GetStatus())
if err != nil {

View File

@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package integration
package refreshconfig
import (
"context"
@ -29,12 +29,13 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/distance"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/tests/integration"
"github.com/stretchr/testify/suite"
"go.uber.org/zap"
)
type RefreshConfigSuite struct {
MiniClusterSuite
integration.MiniClusterSuite
}
func (s *RefreshConfigSuite) TestRefreshPasswordLength() {
@ -42,7 +43,7 @@ func (s *RefreshConfigSuite) TestRefreshPasswordLength() {
ctx, cancel := context.WithCancel(c.GetContext())
defer cancel()
resp, err := c.proxy.CreateCredential(ctx, &milvuspb.CreateCredentialRequest{
resp, err := c.Proxy.CreateCredential(ctx, &milvuspb.CreateCredentialRequest{
Username: "test",
Password: "1234",
})
@ -52,10 +53,10 @@ func (s *RefreshConfigSuite) TestRefreshPasswordLength() {
params := paramtable.Get()
key := fmt.Sprintf("%s/config/proxy/minpasswordlength", params.EtcdCfg.RootPath.GetValue())
c.etcdCli.KV.Put(ctx, key, "3")
c.EtcdCli.KV.Put(ctx, key, "3")
s.Eventually(func() bool {
resp, err = c.proxy.CreateCredential(ctx, &milvuspb.CreateCredentialRequest{
resp, err = c.Proxy.CreateCredential(ctx, &milvuspb.CreateCredentialRequest{
Username: "test",
Password: "1234",
})
@ -70,7 +71,7 @@ func (s *RefreshConfigSuite) TestRefreshDefaultIndexName() {
ctx, cancel := context.WithCancel(c.GetContext())
defer cancel()
params := paramtable.Get()
c.etcdCli.KV.Put(ctx, fmt.Sprintf("%s/config/common/defaultIndexName", params.EtcdCfg.RootPath.GetValue()), "a_index")
c.EtcdCli.KV.Put(ctx, fmt.Sprintf("%s/config/common/defaultIndexName", params.EtcdCfg.RootPath.GetValue()), "a_index")
s.Eventually(func() bool {
return params.CommonCfg.DefaultIndexName.GetValue() == "a_index"
@ -81,11 +82,11 @@ func (s *RefreshConfigSuite) TestRefreshDefaultIndexName() {
collectionName := "test"
rowNum := 100
schema := constructSchema("test", 128, true)
schema := integration.ConstructSchema("test", 128, true)
marshaledSchema, err := proto.Marshal(schema)
s.Require().NoError(err)
createCollectionStatus, err := c.proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
DbName: "default",
CollectionName: "test",
Schema: marshaledSchema,
@ -97,9 +98,9 @@ func (s *RefreshConfigSuite) TestRefreshDefaultIndexName() {
}
s.Equal(createCollectionStatus.GetErrorCode(), commonpb.ErrorCode_Success)
fVecColumn := newFloatVectorFieldData(floatVecField, rowNum, dim)
hashKeys := generateHashKeys(rowNum)
_, err = c.proxy.Insert(ctx, &milvuspb.InsertRequest{
fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, rowNum, dim)
hashKeys := integration.GenerateHashKeys(rowNum)
_, err = c.Proxy.Insert(ctx, &milvuspb.InsertRequest{
DbName: dbName,
CollectionName: collectionName,
FieldsData: []*schemapb.FieldData{fVecColumn},
@ -108,14 +109,27 @@ func (s *RefreshConfigSuite) TestRefreshDefaultIndexName() {
})
s.NoError(err)
_, err = c.proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
// flush
flushResp, err := c.Proxy.Flush(ctx, &milvuspb.FlushRequest{
DbName: dbName,
CollectionNames: []string{collectionName},
})
s.NoError(err)
segmentIDs, has := flushResp.GetCollSegIDs()[collectionName]
s.True(has)
ids := segmentIDs.GetData()
s.NotEmpty(segmentIDs)
s.WaitForFlush(ctx, ids)
_, err = c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
CollectionName: collectionName,
FieldName: floatVecField,
ExtraParams: constructIndexParam(dim, IndexFaissIvfFlat, distance.L2),
FieldName: integration.FloatVecField,
ExtraParams: integration.ConstructIndexParam(dim, integration.IndexFaissIvfFlat, distance.L2),
})
s.NoError(err)
resp, err := c.proxy.DescribeIndex(ctx, &milvuspb.DescribeIndexRequest{
resp, err := c.Proxy.DescribeIndex(ctx, &milvuspb.DescribeIndexRequest{
DbName: dbName,
CollectionName: collectionName,
})

View File

@ -18,6 +18,7 @@ package integration
import (
"context"
"math/rand"
"os"
"strings"
"time"
@ -64,6 +65,7 @@ type MiniClusterSuite struct {
}
func (s *MiniClusterSuite) SetupSuite() {
rand.Seed(time.Now().UnixNano())
s.Require().NoError(s.SetupEmbedEtcd())
}

View File

@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package integration
package upsert
import (
"context"
@ -29,12 +29,13 @@ import (
"github.com/milvus-io/milvus/pkg/util/distance"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/tests/integration"
"github.com/stretchr/testify/suite"
"go.uber.org/zap"
)
type UpsertSuite struct {
MiniClusterSuite
integration.MiniClusterSuite
}
func (s *UpsertSuite) TestUpsert() {
@ -48,11 +49,11 @@ func (s *UpsertSuite) TestUpsert() {
dim := 128
rowNum := 3000
schema := constructSchema(collectionName, dim, false)
schema := integration.ConstructSchema(collectionName, dim, false)
marshaledSchema, err := proto.Marshal(schema)
s.NoError(err)
createCollectionStatus, err := c.proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
DbName: dbName,
CollectionName: collectionName,
Schema: marshaledSchema,
@ -66,15 +67,15 @@ func (s *UpsertSuite) TestUpsert() {
}
log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus))
showCollectionsResp, err := c.proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{})
showCollectionsResp, err := c.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{})
s.NoError(err)
s.True(merr.Ok(showCollectionsResp.GetStatus()))
log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp))
pkFieldData := newInt64FieldData(int64Field, rowNum)
fVecColumn := newFloatVectorFieldData(floatVecField, rowNum, dim)
hashKeys := generateHashKeys(rowNum)
upsertResult, err := c.proxy.Upsert(ctx, &milvuspb.UpsertRequest{
pkFieldData := integration.NewInt64FieldData(integration.Int64Field, rowNum)
fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, rowNum, dim)
hashKeys := integration.GenerateHashKeys(rowNum)
upsertResult, err := c.Proxy.Upsert(ctx, &milvuspb.UpsertRequest{
DbName: dbName,
CollectionName: collectionName,
FieldsData: []*schemapb.FieldData{pkFieldData, fVecColumn},
@ -85,7 +86,7 @@ func (s *UpsertSuite) TestUpsert() {
s.True(merr.Ok(upsertResult.GetStatus()))
// flush
flushResp, err := c.proxy.Flush(ctx, &milvuspb.FlushRequest{
flushResp, err := c.Proxy.Flush(ctx, &milvuspb.FlushRequest{
DbName: dbName,
CollectionNames: []string{collectionName},
})
@ -95,20 +96,20 @@ func (s *UpsertSuite) TestUpsert() {
ids := segmentIDs.GetData()
s.NotEmpty(segmentIDs)
segments, err := c.metaWatcher.ShowSegments()
segments, err := c.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
for _, segment := range segments {
log.Info("ShowSegments result", zap.String("segment", segment.String()))
}
waitingForFlush(ctx, c, ids)
s.WaitForFlush(ctx, ids)
// create index
createIndexStatus, err := c.proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
CollectionName: collectionName,
FieldName: floatVecField,
FieldName: integration.FloatVecField,
IndexName: "_default",
ExtraParams: constructIndexParam(dim, IndexFaissIvfFlat, distance.IP),
ExtraParams: integration.ConstructIndexParam(dim, integration.IndexFaissIvfFlat, distance.IP),
})
s.NoError(err)
err = merr.Error(createIndexStatus)
@ -116,10 +117,10 @@ func (s *UpsertSuite) TestUpsert() {
log.Warn("createIndexStatus fail reason", zap.Error(err))
}
waitingForIndexBuilt(ctx, c, s.T(), collectionName, floatVecField)
s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField)
// load
loadStatus, err := c.proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
DbName: dbName,
CollectionName: collectionName,
})
@ -128,18 +129,18 @@ func (s *UpsertSuite) TestUpsert() {
if err != nil {
log.Warn("LoadCollection fail reason", zap.Error(err))
}
waitingForLoad(ctx, c, collectionName)
s.WaitForLoad(ctx, collectionName)
// search
expr := fmt.Sprintf("%s > 0", int64Field)
expr := fmt.Sprintf("%s > 0", integration.Int64Field)
nq := 10
topk := 10
roundDecimal := -1
params := getSearchParams(IndexFaissIvfFlat, "")
searchReq := constructSearchRequest("", collectionName, expr,
floatVecField, schemapb.DataType_FloatVector, nil, distance.IP, params, nq, dim, topk, roundDecimal)
params := integration.GetSearchParams(integration.IndexFaissIvfFlat, "")
searchReq := integration.ConstructSearchRequest("", collectionName, expr,
integration.FloatVecField, schemapb.DataType_FloatVector, nil, distance.IP, params, nq, dim, topk, roundDecimal)
searchResult, _ := c.proxy.Search(ctx, searchReq)
searchResult, _ := c.Proxy.Search(ctx, searchReq)
err = merr.Error(searchResult.GetStatus())
if err != nil {

View File

@ -42,9 +42,41 @@ const (
IndexDISKANN = indexparamcheck.IndexDISKANN
)
func (s *MiniClusterSuite) WaitForIndexBuilt(ctx context.Context, collection, field string) {
getIndexBuilt := func() bool {
resp, err := s.Cluster.Proxy.DescribeIndex(ctx, &milvuspb.DescribeIndexRequest{
CollectionName: collection,
FieldName: field,
})
if err != nil {
s.FailNow("failed to describe index")
return true
}
for _, desc := range resp.GetIndexDescriptions() {
if desc.GetFieldName() == field {
switch desc.GetState() {
case commonpb.IndexState_Finished:
return true
case commonpb.IndexState_Failed:
return false
}
}
}
return false
}
for !getIndexBuilt() {
select {
case <-ctx.Done():
s.FailNow("failed to wait index built until ctx done")
return
case <-time.After(500 * time.Millisecond):
}
}
}
func waitingForIndexBuilt(ctx context.Context, cluster *MiniCluster, t *testing.T, collection, field string) {
getIndexBuilt := func() bool {
resp, err := cluster.proxy.DescribeIndex(ctx, &milvuspb.DescribeIndexRequest{
resp, err := cluster.Proxy.DescribeIndex(ctx, &milvuspb.DescribeIndexRequest{
CollectionName: collection,
FieldName: field,
})
@ -74,7 +106,7 @@ func waitingForIndexBuilt(ctx context.Context, cluster *MiniCluster, t *testing.
}
}
func constructIndexParam(dim int, indexType string, metricType string) []*commonpb.KeyValuePair {
func ConstructIndexParam(dim int, indexType string, metricType string) []*commonpb.KeyValuePair {
params := []*commonpb.KeyValuePair{
{
Key: common.DimKey,
@ -126,7 +158,7 @@ func constructIndexParam(dim int, indexType string, metricType string) []*common
return params
}
func getSearchParams(indexType string, metricType string) map[string]any {
func GetSearchParams(indexType string, metricType string) map[string]any {
params := make(map[string]any)
switch indexType {
case IndexFaissIDMap, IndexFaissBinIDMap:

View File

@ -26,9 +26,30 @@ import (
"github.com/milvus-io/milvus-proto/go-api/schemapb"
)
func (s *MiniClusterSuite) WaitForFlush(ctx context.Context, segIDs []int64) {
flushed := func() bool {
resp, err := s.Cluster.Proxy.GetFlushState(ctx, &milvuspb.GetFlushStateRequest{
SegmentIDs: segIDs,
})
if err != nil {
return false
}
return resp.GetFlushed()
}
for !flushed() {
select {
case <-ctx.Done():
s.FailNow("failed to wait for flush until ctx done")
return
default:
time.Sleep(500 * time.Millisecond)
}
}
}
func waitingForFlush(ctx context.Context, cluster *MiniCluster, segIDs []int64) {
flushed := func() bool {
resp, err := cluster.proxy.GetFlushState(ctx, &milvuspb.GetFlushStateRequest{
resp, err := cluster.Proxy.GetFlushState(ctx, &milvuspb.GetFlushStateRequest{
SegmentIDs: segIDs,
})
if err != nil {
@ -46,7 +67,7 @@ func waitingForFlush(ctx context.Context, cluster *MiniCluster, segIDs []int64)
}
}
func newInt64FieldData(fieldName string, numRows int) *schemapb.FieldData {
func NewInt64FieldData(fieldName string, numRows int) *schemapb.FieldData {
return &schemapb.FieldData{
Type: schemapb.DataType_Int64,
FieldName: fieldName,
@ -54,7 +75,7 @@ func newInt64FieldData(fieldName string, numRows int) *schemapb.FieldData {
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_LongData{
LongData: &schemapb.LongArray{
Data: generateInt64Array(numRows),
Data: GenerateInt64Array(numRows),
},
},
},
@ -62,7 +83,7 @@ func newInt64FieldData(fieldName string, numRows int) *schemapb.FieldData {
}
}
func newStringFieldData(fieldName string, numRows int) *schemapb.FieldData {
func NewStringFieldData(fieldName string, numRows int) *schemapb.FieldData {
return &schemapb.FieldData{
Type: schemapb.DataType_Int64,
FieldName: fieldName,
@ -70,7 +91,7 @@ func newStringFieldData(fieldName string, numRows int) *schemapb.FieldData {
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_StringData{
StringData: &schemapb.StringArray{
Data: generateStringArray(numRows),
Data: GenerateStringArray(numRows),
},
},
},
@ -78,7 +99,7 @@ func newStringFieldData(fieldName string, numRows int) *schemapb.FieldData {
}
}
func newFloatVectorFieldData(fieldName string, numRows, dim int) *schemapb.FieldData {
func NewFloatVectorFieldData(fieldName string, numRows, dim int) *schemapb.FieldData {
return &schemapb.FieldData{
Type: schemapb.DataType_FloatVector,
FieldName: fieldName,
@ -87,7 +108,7 @@ func newFloatVectorFieldData(fieldName string, numRows, dim int) *schemapb.Field
Dim: int64(dim),
Data: &schemapb.VectorField_FloatVector{
FloatVector: &schemapb.FloatArray{
Data: generateFloatVectors(numRows, dim),
Data: GenerateFloatVectors(numRows, dim),
},
},
},
@ -95,7 +116,7 @@ func newFloatVectorFieldData(fieldName string, numRows, dim int) *schemapb.Field
}
}
func newBinaryVectorFieldData(fieldName string, numRows, dim int) *schemapb.FieldData {
func NewBinaryVectorFieldData(fieldName string, numRows, dim int) *schemapb.FieldData {
return &schemapb.FieldData{
Type: schemapb.DataType_BinaryVector,
FieldName: fieldName,
@ -103,14 +124,14 @@ func newBinaryVectorFieldData(fieldName string, numRows, dim int) *schemapb.Fiel
Vectors: &schemapb.VectorField{
Dim: int64(dim),
Data: &schemapb.VectorField_BinaryVector{
BinaryVector: generateBinaryVectors(numRows, dim),
BinaryVector: GenerateBinaryVectors(numRows, dim),
},
},
},
}
}
func generateInt64Array(numRows int) []int64 {
func GenerateInt64Array(numRows int) []int64 {
ret := make([]int64, numRows)
for i := 0; i < numRows; i++ {
ret[i] = int64(i)
@ -118,7 +139,7 @@ func generateInt64Array(numRows int) []int64 {
return ret
}
func generateStringArray(numRows int) []string {
func GenerateStringArray(numRows int) []string {
ret := make([]string, numRows)
for i := 0; i < numRows; i++ {
ret[i] = fmt.Sprintf("%d", i)
@ -126,7 +147,7 @@ func generateStringArray(numRows int) []string {
return ret
}
func generateFloatVectors(numRows, dim int) []float32 {
func GenerateFloatVectors(numRows, dim int) []float32 {
total := numRows * dim
ret := make([]float32, 0, total)
for i := 0; i < total; i++ {
@ -135,7 +156,7 @@ func generateFloatVectors(numRows, dim int) []float32 {
return ret
}
func generateBinaryVectors(numRows, dim int) []byte {
func GenerateBinaryVectors(numRows, dim int) []byte {
total := (numRows * dim) / 8
ret := make([]byte, total)
_, err := rand.Read(ret)
@ -145,7 +166,7 @@ func generateBinaryVectors(numRows, dim int) []byte {
return ret
}
func generateHashKeys(numRows int) []uint32 {
func GenerateHashKeys(numRows int) []uint32 {
ret := make([]uint32, 0, numRows)
for i := 0; i < numRows; i++ {
ret = append(ret, rand.Uint32())

View File

@ -44,9 +44,31 @@ const (
LimitKey = "limit"
)
func (s *MiniClusterSuite) WaitForLoad(ctx context.Context, collection string) {
cluster := s.Cluster
getLoadingProgress := func() *milvuspb.GetLoadingProgressResponse {
loadProgress, err := cluster.Proxy.GetLoadingProgress(ctx, &milvuspb.GetLoadingProgressRequest{
CollectionName: collection,
})
if err != nil {
panic("GetLoadingProgress fail")
}
return loadProgress
}
for getLoadingProgress().GetProgress() != 100 {
select {
case <-ctx.Done():
s.FailNow("failed to wait for load")
return
default:
time.Sleep(500 * time.Millisecond)
}
}
}
func waitingForLoad(ctx context.Context, cluster *MiniCluster, collection string) {
getLoadingProgress := func() *milvuspb.GetLoadingProgressResponse {
loadProgress, err := cluster.proxy.GetLoadingProgress(ctx, &milvuspb.GetLoadingProgressRequest{
loadProgress, err := cluster.Proxy.GetLoadingProgress(ctx, &milvuspb.GetLoadingProgressRequest{
CollectionName: collection,
})
if err != nil {
@ -64,7 +86,7 @@ func waitingForLoad(ctx context.Context, cluster *MiniCluster, collection string
}
}
func constructSearchRequest(
func ConstructSearchRequest(
dbName, collectionName string,
expr string,
vecField string,

View File

@ -25,20 +25,20 @@ import (
)
const (
boolField = "boolField"
int8Field = "int8Field"
int16Field = "int16Field"
int32Field = "int32Field"
int64Field = "int64Field"
floatField = "floatField"
doubleField = "doubleField"
varCharField = "varCharField"
jsonField = "jsonField"
floatVecField = "floatVecField"
binVecField = "binVecField"
BoolField = "boolField"
Int8Field = "int8Field"
Int16Field = "int16Field"
Int32Field = "int32Field"
Int64Field = "int64Field"
FloatField = "floatField"
DoubleField = "doubleField"
VarCharField = "varCharField"
JSONField = "jsonField"
FloatVecField = "floatVecField"
BinVecField = "binVecField"
)
func constructSchema(collection string, dim int, autoID bool, fields ...*schemapb.FieldSchema) *schemapb.CollectionSchema {
func ConstructSchema(collection string, dim int, autoID bool, fields ...*schemapb.FieldSchema) *schemapb.CollectionSchema {
// if fields are specified, construct it
if len(fields) > 0 {
return &schemapb.CollectionSchema{
@ -51,7 +51,7 @@ func constructSchema(collection string, dim int, autoID bool, fields ...*schemap
// if no field is specified, use default
pk := &schemapb.FieldSchema{
FieldID: 100,
Name: int64Field,
Name: Int64Field,
IsPrimaryKey: true,
Description: "",
DataType: schemapb.DataType_Int64,
@ -61,7 +61,7 @@ func constructSchema(collection string, dim int, autoID bool, fields ...*schemap
}
fVec := &schemapb.FieldSchema{
FieldID: 101,
Name: floatVecField,
Name: FloatVecField,
IsPrimaryKey: false,
Description: "",
DataType: schemapb.DataType_FloatVector,