// Licensed to the LF AI & Data foundation under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package integration import ( "context" "strconv" "testing" "time" "github.com/golang/protobuf/proto" "github.com/stretchr/testify/suite" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/metric" ) type MetaWatcherSuite struct { MiniClusterSuite } func (s *MetaWatcherSuite) TestShowSessions() { sessions, err := s.Cluster.MetaWatcher.ShowSessions() s.NoError(err) s.NotEmpty(sessions) for _, session := range sessions { log.Info("ShowSessions result", zap.String("session", session.String())) } } func (s *MetaWatcherSuite) TestShowSegments() { c := s.Cluster ctx, cancel := context.WithCancel(c.GetContext()) defer cancel() prefix := "TestShowSegments" dbName := "" collectionName := prefix + funcutil.GenRandomStr() int64Field := "int64" floatVecField := "fvec" dim := 128 rowNum := 3000 constructCollectionSchema := func() *schemapb.CollectionSchema { pk := &schemapb.FieldSchema{ FieldID: 0, Name: int64Field, IsPrimaryKey: true, Description: "", DataType: schemapb.DataType_Int64, TypeParams: nil, IndexParams: nil, AutoID: true, } fVec := &schemapb.FieldSchema{ FieldID: 0, Name: floatVecField, IsPrimaryKey: false, Description: "", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{ { Key: common.DimKey, Value: strconv.Itoa(dim), }, }, IndexParams: nil, AutoID: false, } return &schemapb.CollectionSchema{ Name: collectionName, Description: "", AutoID: false, Fields: []*schemapb.FieldSchema{ pk, fVec, }, } } schema := constructCollectionSchema() marshaledSchema, err := proto.Marshal(schema) s.NoError(err) createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ DbName: dbName, CollectionName: collectionName, Schema: marshaledSchema, ShardsNum: common.DefaultShardsNum, }) s.NoError(err) s.Equal(createCollectionStatus.GetErrorCode(), commonpb.ErrorCode_Success) log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus)) showCollectionsResp, err := c.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{}) s.NoError(err) s.Equal(showCollectionsResp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success) log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp)) fVecColumn := NewFloatVectorFieldData(floatVecField, rowNum, dim) hashKeys := GenerateHashKeys(rowNum) insertResult, err := c.Proxy.Insert(ctx, &milvuspb.InsertRequest{ DbName: dbName, CollectionName: collectionName, FieldsData: []*schemapb.FieldData{fVecColumn}, HashKeys: hashKeys, NumRows: uint32(rowNum), }) s.NoError(err) s.Equal(insertResult.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success) segments, err := c.MetaWatcher.ShowSegments() s.NoError(err) s.NotEmpty(segments) for _, segment := range segments { log.Info("ShowSegments result", zap.String("segment", segment.String())) } } func (s *MetaWatcherSuite) TestShowReplicas() { c := s.Cluster ctx, cancel := context.WithCancel(c.GetContext()) defer cancel() prefix := "TestShowReplicas" dbName := "" collectionName := prefix + funcutil.GenRandomStr() int64Field := "int64" floatVecField := "fvec" dim := 128 rowNum := 3000 constructCollectionSchema := func() *schemapb.CollectionSchema { pk := &schemapb.FieldSchema{ FieldID: 0, Name: int64Field, IsPrimaryKey: true, Description: "", DataType: schemapb.DataType_Int64, TypeParams: nil, IndexParams: nil, AutoID: true, } fVec := &schemapb.FieldSchema{ FieldID: 0, Name: floatVecField, IsPrimaryKey: false, Description: "", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{ { Key: common.DimKey, Value: strconv.Itoa(dim), }, }, IndexParams: nil, AutoID: false, } return &schemapb.CollectionSchema{ Name: collectionName, Description: "", AutoID: false, Fields: []*schemapb.FieldSchema{ pk, fVec, }, } } schema := constructCollectionSchema() marshaledSchema, err := proto.Marshal(schema) s.NoError(err) createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ DbName: dbName, CollectionName: collectionName, Schema: marshaledSchema, ShardsNum: common.DefaultShardsNum, }) s.NoError(err) if createCollectionStatus.GetErrorCode() != commonpb.ErrorCode_Success { log.Warn("createCollectionStatus fail reason", zap.String("reason", createCollectionStatus.GetReason())) } s.Equal(createCollectionStatus.GetErrorCode(), commonpb.ErrorCode_Success) log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus)) showCollectionsResp, err := c.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{}) s.NoError(err) s.Equal(showCollectionsResp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success) log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp)) fVecColumn := NewFloatVectorFieldData(floatVecField, rowNum, dim) hashKeys := GenerateHashKeys(rowNum) insertResult, err := c.Proxy.Insert(ctx, &milvuspb.InsertRequest{ DbName: dbName, CollectionName: collectionName, FieldsData: []*schemapb.FieldData{fVecColumn}, HashKeys: hashKeys, NumRows: uint32(rowNum), }) s.NoError(err) s.Equal(insertResult.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success) // flush flushResp, err := c.Proxy.Flush(ctx, &milvuspb.FlushRequest{ DbName: dbName, CollectionNames: []string{collectionName}, }) s.NoError(err) segments, err := c.MetaWatcher.ShowSegments() s.NoError(err) s.NotEmpty(segments) for _, segment := range segments { log.Info("ShowSegments result", zap.String("segment", segment.String())) } segmentIDs, has := flushResp.GetCollSegIDs()[collectionName] ids := segmentIDs.GetData() s.Require().NotEmpty(segmentIDs) s.Require().True(has) flushTs, has := flushResp.GetCollFlushTs()[collectionName] s.True(has) s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName) // create index createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ CollectionName: collectionName, FieldName: floatVecField, IndexName: "_default", ExtraParams: []*commonpb.KeyValuePair{ { Key: common.DimKey, Value: strconv.Itoa(dim), }, { Key: common.MetricTypeKey, Value: metric.L2, }, { Key: common.IndexTypeKey, Value: "IVF_FLAT", }, { Key: "nlist", Value: strconv.Itoa(10), }, }, }) s.NoError(err) if createIndexStatus.GetErrorCode() != commonpb.ErrorCode_Success { log.Warn("createIndexStatus fail reason", zap.String("reason", createIndexStatus.GetReason())) } s.Equal(commonpb.ErrorCode_Success, createIndexStatus.GetErrorCode()) waitingForIndexBuilt(ctx, c, s.T(), collectionName, floatVecField) // load loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ DbName: dbName, CollectionName: collectionName, }) s.NoError(err) if loadStatus.GetErrorCode() != commonpb.ErrorCode_Success { log.Warn("loadStatus fail reason", zap.String("reason", loadStatus.GetReason())) } s.Equal(commonpb.ErrorCode_Success, loadStatus.GetErrorCode()) for { loadProgress, err := c.Proxy.GetLoadingProgress(ctx, &milvuspb.GetLoadingProgressRequest{ CollectionName: collectionName, }) if err != nil { panic("GetLoadingProgress fail") } if loadProgress.GetProgress() == 100 { break } time.Sleep(500 * time.Millisecond) } replicas, err := c.MetaWatcher.ShowReplicas() s.NoError(err) s.NotEmpty(replicas) for _, replica := range replicas { log.Info("ShowReplicas result", zap.String("replica", PrettyReplica(replica))) } log.Info("TestShowReplicas succeed") } func TestMetaWatcher(t *testing.T) { t.Skip("Skip integration test, need to refactor integration test framework") suite.Run(t, new(MetaWatcherSuite)) }