package reader import ( "context" "testing" "time" "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" ) // NOTE: start pulsar before test func TestStatsService_start(t *testing.T) { Params.Init() var ctx context.Context if closeWithDeadline { var cancel context.CancelFunc d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) ctx, cancel = context.WithDeadline(context.Background(), d) defer cancel() } else { ctx = context.Background() } // init query node node := NewQueryNode(ctx, 0) // init meta collectionName := "collection0" fieldVec := schemapb.FieldSchema{ Name: "vec", DataType: schemapb.DataType_VECTOR_FLOAT, TypeParams: []*commonpb.KeyValuePair{ { Key: "dim", Value: "16", }, }, } fieldInt := schemapb.FieldSchema{ Name: "age", DataType: schemapb.DataType_INT32, TypeParams: []*commonpb.KeyValuePair{ { Key: "dim", Value: "1", }, }, } schema := schemapb.CollectionSchema{ Name: collectionName, Fields: []*schemapb.FieldSchema{ &fieldVec, &fieldInt, }, } collectionMeta := etcdpb.CollectionMeta{ ID: UniqueID(0), Schema: &schema, CreateTime: Timestamp(0), SegmentIDs: []UniqueID{0}, PartitionTags: []string{"default"}, } collectionMetaBlob := proto.MarshalTextString(&collectionMeta) assert.NotEqual(t, "", collectionMetaBlob) var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) assert.NoError(t, err) collection, err := (*node.replica).getCollectionByName(collectionName) assert.NoError(t, err) assert.Equal(t, collection.meta.Schema.Name, "collection0") assert.Equal(t, collection.meta.ID, UniqueID(0)) assert.Equal(t, (*node.replica).getCollectionNum(), 1) err = (*node.replica).addPartition(collection.ID(), collectionMeta.PartitionTags[0]) assert.NoError(t, err) segmentID := UniqueID(0) err = (*node.replica).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0)) assert.NoError(t, err) // start stats service node.statsService = newStatsService(node.ctx, node.replica) node.statsService.start() } // NOTE: start pulsar before test func TestSegmentManagement_SegmentStatisticService(t *testing.T) { Params.Init() var ctx context.Context if closeWithDeadline { var cancel context.CancelFunc d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) ctx, cancel = context.WithDeadline(context.Background(), d) defer cancel() } else { ctx = context.Background() } // init query node pulsarURL, _ := Params.pulsarAddress() node := NewQueryNode(ctx, 0) // init meta collectionName := "collection0" fieldVec := schemapb.FieldSchema{ Name: "vec", DataType: schemapb.DataType_VECTOR_FLOAT, TypeParams: []*commonpb.KeyValuePair{ { Key: "dim", Value: "16", }, }, } fieldInt := schemapb.FieldSchema{ Name: "age", DataType: schemapb.DataType_INT32, TypeParams: []*commonpb.KeyValuePair{ { Key: "dim", Value: "1", }, }, } schema := schemapb.CollectionSchema{ Name: collectionName, Fields: []*schemapb.FieldSchema{ &fieldVec, &fieldInt, }, } collectionMeta := etcdpb.CollectionMeta{ ID: UniqueID(0), Schema: &schema, CreateTime: Timestamp(0), SegmentIDs: []UniqueID{0}, PartitionTags: []string{"default"}, } collectionMetaBlob := proto.MarshalTextString(&collectionMeta) assert.NotEqual(t, "", collectionMetaBlob) var err = (*node.replica).addCollection(&collectionMeta, collectionMetaBlob) assert.NoError(t, err) collection, err := (*node.replica).getCollectionByName(collectionName) assert.NoError(t, err) assert.Equal(t, collection.meta.Schema.Name, "collection0") assert.Equal(t, collection.meta.ID, UniqueID(0)) assert.Equal(t, (*node.replica).getCollectionNum(), 1) err = (*node.replica).addPartition(collection.ID(), collectionMeta.PartitionTags[0]) assert.NoError(t, err) segmentID := UniqueID(0) err = (*node.replica).addSegment(segmentID, collectionMeta.PartitionTags[0], UniqueID(0)) assert.NoError(t, err) const receiveBufSize = 1024 // start pulsar producerChannels := []string{"statistic"} statsStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize) statsStream.SetPulsarClient(pulsarURL) statsStream.CreatePulsarProducers(producerChannels) var statsMsgStream msgstream.MsgStream = statsStream node.statsService = newStatsService(node.ctx, node.replica) node.statsService.statsStream = &statsMsgStream (*node.statsService.statsStream).Start() // send stats node.statsService.sendSegmentStatistic() }