diff --git a/pkg/master/mock/collection.go b/pkg/master/mock/collection.go index e375b6e1c6..ea1a1e28c5 100644 --- a/pkg/master/mock/collection.go +++ b/pkg/master/mock/collection.go @@ -1,30 +1,77 @@ package mock import ( + "fmt" "time" + pb "github.com/czs007/suvlim/pkg/master/grpc/master" + messagepb "github.com/czs007/suvlim/pkg/master/grpc/message" + "github.com/golang/protobuf/proto" + "github.com/google/uuid" jsoniter "github.com/json-iterator/go" ) var json = jsoniter.ConfigCompatibleWithStandardLibrary type Collection struct { - ID uint64 `json:"id"` - Name string `json:"name"` - CreateTime time.Time `json:"creat_time"` - SegmentIDs []uint64 `json:"segment_ids"` - PartitionTags []string `json:"partition_tags"` + ID uint64 `json:"id"` + Name string `json:"name"` + CreateTime uint64 `json:"creat_time"` + Schema []FieldMeta `json:"schema"` + // ExtraSchema []FieldMeta `json:"extra_schema"` + SegmentIDs []uint64 `json:"segment_ids"` + PartitionTags []string `json:"partition_tags"` + GrpcMarshalString string `json:"grpc_marshal_string"` } -func FakeCreateCollection(id uint64) Collection { - cl := Collection{ - ID: id, - Name: "test-collection", - CreateTime: time.Now(), - SegmentIDs: []uint64{uint64(10111)}, - PartitionTags: []string{"default"}, +type FieldMeta struct { + FieldName string `json:"field_name"` + Type string `json:"type"` + DIM int64 `json:"dimension"` +} + +func GrpcMarshal(c *Collection) *Collection { + if c.GrpcMarshalString != "" { + c.GrpcMarshalString = "" + } + pbSchema := &messagepb.Schema{ + FieldMetas: []*messagepb.FieldMeta{}, + } + grpcCollection := &pb.Collection{ + Id: c.ID, + Name: c.Name, + Schema: pbSchema, + CreateTime: c.CreateTime, + SegmentIds: c.SegmentIDs, + PartitionTags: c.PartitionTags, + } + out, err := proto.Marshal(grpcCollection) + if err != nil { + fmt.Println(err) + } + c.GrpcMarshalString = string(out) + return c +} + +func NewCollection(id uuid.UUID, name string, createTime time.Time, + schema []*messagepb.FieldMeta, sIds []uuid.UUID, ptags []string) Collection { + + segementIDs := []uint64{} + newSchema := []FieldMeta{} + for _, v := range schema { + newSchema = append(newSchema, FieldMeta{FieldName: v.FieldName, Type: v.Type.String(), DIM: v.Dim}) + } + for _, sid := range sIds { + segementIDs = append(segementIDs, uint64(sid.ID())) + } + return Collection{ + ID: uint64(id.ID()), + Name: name, + CreateTime: uint64(createTime.Unix()), + Schema: newSchema, + SegmentIDs: segementIDs, + PartitionTags: ptags, } - return cl } func Collection2JSON(c Collection) (string, error) { diff --git a/pkg/master/mock/collection_test.go b/pkg/master/mock/collection_test.go index b37b5f2df9..f40dc605ac 100644 --- a/pkg/master/mock/collection_test.go +++ b/pkg/master/mock/collection_test.go @@ -9,7 +9,7 @@ import ( var C = Collection{ ID: uint64(11111), Name: "test-collection", - CreateTime: time.Now(), + CreateTime: uint64(time.Now().Unix()), SegmentIDs: []uint64{uint64(10111)}, PartitionTags: []string{"default"}, } diff --git a/pkg/master/mock/segment.go b/pkg/master/mock/segment.go index 179a8dd24e..59bf1c66c7 100644 --- a/pkg/master/mock/segment.go +++ b/pkg/master/mock/segment.go @@ -4,6 +4,8 @@ import ( "bytes" "encoding/gob" "time" + + "github.com/google/uuid" ) type SegmentStats struct { @@ -38,10 +40,21 @@ type Segment struct { PartitionTag string `json:"partition_tag"` ChannelStart int `json:"channel_start"` ChannelEnd int `json:"channel_end"` - OpenTimeStamp time.Time `json:"open_timestamp"` - CloseTimeStamp time.Time `json:"clost_timestamp"` + OpenTimeStamp uint64 `json:"open_timestamp"` + CloseTimeStamp uint64 `json:"close_timestamp"` } +func NewSegment(id uuid.UUID, collection Collection, ptag string, chStart int, chEnd int, openTime time.Time, closeTime time.Time) Segment { + return Segment{ + SegmentID: uint64(id.ID()), + Collection: collection, + PartitionTag: ptag, + ChannelStart: chStart, + ChannelEnd: chEnd, + OpenTimeStamp: uint64(openTime.Unix()), + CloseTimeStamp: uint64(closeTime.Unix()), + } +} func Segment2JSON(s Segment) (string, error) { b, err := json.Marshal(&s) if err != nil { @@ -59,16 +72,3 @@ func JSON2Segment(s string) (*Segment, error) { } return &c, nil } - -func FakeCreateSegment(id uint64, cl Collection, opentime time.Time, closetime time.Time) Segment { - seg := Segment{ - SegmentID: id, - Collection: cl, - PartitionTag: "default", - ChannelStart: 0, - ChannelEnd: 100, - OpenTimeStamp: opentime, - CloseTimeStamp: closetime, - } - return seg -} diff --git a/pkg/master/mock/segment_test.go b/pkg/master/mock/segment_test.go index 7336413994..610b93aff5 100644 --- a/pkg/master/mock/segment_test.go +++ b/pkg/master/mock/segment_test.go @@ -34,15 +34,15 @@ var Ts = Segment{ Collection: Collection{ ID: uint64(11111), Name: "test-collection", - CreateTime: time.Now(), + CreateTime: uint64(time.Now().Unix()), SegmentIDs: []uint64{uint64(10111)}, PartitionTags: []string{"default"}, }, PartitionTag: "default", ChannelStart: 1, ChannelEnd: 100, - OpenTimeStamp: time.Now(), - CloseTimeStamp: time.Now().Add(1 * time.Hour), + OpenTimeStamp: uint64(time.Now().Unix()), + CloseTimeStamp: uint64(time.Now().Add(1 * time.Hour).Unix()), } func TestSegment2JSON(t *testing.T) { diff --git a/pkg/master/server.go b/pkg/master/server.go index b3579da497..2469015fd5 100644 --- a/pkg/master/server.go +++ b/pkg/master/server.go @@ -68,7 +68,7 @@ func ComputeCloseTime(ss mock.SegmentStats, kvbase kv.Base) error { if err != nil { return err } - seg.CloseTimeStamp = time.Now().Add(time.Duration(sec) * time.Second) + seg.CloseTimeStamp = uint64(time.Now().Add(time.Duration(sec) * time.Second).Unix()) updateData, err := mock.Segment2JSON(*seg) if err != nil { return err @@ -119,25 +119,27 @@ func CollectionController(ch chan *messagepb.Mapping) { defer cli.Close() kvbase := kv.NewEtcdKVBase(cli, common.ETCD_ROOT_PATH) for collection := range ch { - pTag := uuid.New() + sID := uuid.New() cID := uuid.New() - c := mock.Collection{ - Name: collection.CollectionName, - CreateTime: time.Now(), - ID: uint64(cID.ID()), - PartitionTags: []string{pTag.String()}, + fieldMetas := []*messagepb.FieldMeta{} + if collection.Schema != nil { + fieldMetas = collection.Schema.FieldMetas } - s := mock.FakeCreateSegment(uint64(pTag.ID()), c, time.Now(), time.Unix(1<<36-1, 0)) - collectionData, _ := mock.Collection2JSON(c) + c := mock.NewCollection(cID, collection.CollectionName, + time.Now(), fieldMetas, []uuid.UUID{sID}, + []string{"default"}) + cm := mock.GrpcMarshal(&c) + s := mock.NewSegment(sID, c, "default", 0, 100, time.Now(), time.Unix(1<<36-1, 0)) + collectionData, _ := mock.Collection2JSON(*cm) segmentData, err := mock.Segment2JSON(s) if err != nil { log.Fatal(err) } - err = kvbase.Save(cID.String(), collectionData) + err = kvbase.Save("collection/"+cID.String(), collectionData) if err != nil { log.Fatal(err) } - err = kvbase.Save(pTag.String(), segmentData) + err = kvbase.Save("segment/"+sID.String(), segmentData) if err != nil { log.Fatal(err) }