mirror of https://github.com/milvus-io/milvus.git
Implement the marshal logic when data to kv store
Signed-off-by: rain <boyan.wang@zilliz.com>pull/4973/head^2
parent
917dc677af
commit
11125bc6eb
|
@ -1,8 +1,13 @@
|
||||||
package mock
|
package mock
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
pb "github.com/czs007/suvlim/pkg/master/grpc/master"
|
||||||
|
messagepb "github.com/czs007/suvlim/pkg/master/grpc/message"
|
||||||
|
"github.com/golang/protobuf/proto"
|
||||||
|
"github.com/google/uuid"
|
||||||
jsoniter "github.com/json-iterator/go"
|
jsoniter "github.com/json-iterator/go"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -11,20 +16,62 @@ var json = jsoniter.ConfigCompatibleWithStandardLibrary
|
||||||
type Collection struct {
|
type Collection struct {
|
||||||
ID uint64 `json:"id"`
|
ID uint64 `json:"id"`
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
CreateTime time.Time `json:"creat_time"`
|
CreateTime uint64 `json:"creat_time"`
|
||||||
|
Schema []FieldMeta `json:"schema"`
|
||||||
|
// ExtraSchema []FieldMeta `json:"extra_schema"`
|
||||||
SegmentIDs []uint64 `json:"segment_ids"`
|
SegmentIDs []uint64 `json:"segment_ids"`
|
||||||
PartitionTags []string `json:"partition_tags"`
|
PartitionTags []string `json:"partition_tags"`
|
||||||
|
GrpcMarshalString string `json:"grpc_marshal_string"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func FakeCreateCollection(id uint64) Collection {
|
type FieldMeta struct {
|
||||||
cl := Collection{
|
FieldName string `json:"field_name"`
|
||||||
ID: id,
|
Type string `json:"type"`
|
||||||
Name: "test-collection",
|
DIM int64 `json:"dimension"`
|
||||||
CreateTime: time.Now(),
|
}
|
||||||
SegmentIDs: []uint64{uint64(10111)},
|
|
||||||
PartitionTags: []string{"default"},
|
func GrpcMarshal(c *Collection) *Collection {
|
||||||
|
if c.GrpcMarshalString != "" {
|
||||||
|
c.GrpcMarshalString = ""
|
||||||
|
}
|
||||||
|
pbSchema := &messagepb.Schema{
|
||||||
|
FieldMetas: []*messagepb.FieldMeta{},
|
||||||
|
}
|
||||||
|
grpcCollection := &pb.Collection{
|
||||||
|
Id: c.ID,
|
||||||
|
Name: c.Name,
|
||||||
|
Schema: pbSchema,
|
||||||
|
CreateTime: c.CreateTime,
|
||||||
|
SegmentIds: c.SegmentIDs,
|
||||||
|
PartitionTags: c.PartitionTags,
|
||||||
|
}
|
||||||
|
out, err := proto.Marshal(grpcCollection)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println(err)
|
||||||
|
}
|
||||||
|
c.GrpcMarshalString = string(out)
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewCollection(id uuid.UUID, name string, createTime time.Time,
|
||||||
|
schema []*messagepb.FieldMeta, sIds []uuid.UUID, ptags []string) Collection {
|
||||||
|
|
||||||
|
segementIDs := []uint64{}
|
||||||
|
newSchema := []FieldMeta{}
|
||||||
|
for _, v := range schema {
|
||||||
|
newSchema = append(newSchema, FieldMeta{FieldName: v.FieldName, Type: v.Type.String(), DIM: v.Dim})
|
||||||
|
}
|
||||||
|
for _, sid := range sIds {
|
||||||
|
segementIDs = append(segementIDs, uint64(sid.ID()))
|
||||||
|
}
|
||||||
|
return Collection{
|
||||||
|
ID: uint64(id.ID()),
|
||||||
|
Name: name,
|
||||||
|
CreateTime: uint64(createTime.Unix()),
|
||||||
|
Schema: newSchema,
|
||||||
|
SegmentIDs: segementIDs,
|
||||||
|
PartitionTags: ptags,
|
||||||
}
|
}
|
||||||
return cl
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func Collection2JSON(c Collection) (string, error) {
|
func Collection2JSON(c Collection) (string, error) {
|
||||||
|
|
|
@ -9,7 +9,7 @@ import (
|
||||||
var C = Collection{
|
var C = Collection{
|
||||||
ID: uint64(11111),
|
ID: uint64(11111),
|
||||||
Name: "test-collection",
|
Name: "test-collection",
|
||||||
CreateTime: time.Now(),
|
CreateTime: uint64(time.Now().Unix()),
|
||||||
SegmentIDs: []uint64{uint64(10111)},
|
SegmentIDs: []uint64{uint64(10111)},
|
||||||
PartitionTags: []string{"default"},
|
PartitionTags: []string{"default"},
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,6 +4,8 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/gob"
|
"encoding/gob"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
)
|
)
|
||||||
|
|
||||||
type SegmentStats struct {
|
type SegmentStats struct {
|
||||||
|
@ -38,10 +40,21 @@ type Segment struct {
|
||||||
PartitionTag string `json:"partition_tag"`
|
PartitionTag string `json:"partition_tag"`
|
||||||
ChannelStart int `json:"channel_start"`
|
ChannelStart int `json:"channel_start"`
|
||||||
ChannelEnd int `json:"channel_end"`
|
ChannelEnd int `json:"channel_end"`
|
||||||
OpenTimeStamp time.Time `json:"open_timestamp"`
|
OpenTimeStamp uint64 `json:"open_timestamp"`
|
||||||
CloseTimeStamp time.Time `json:"clost_timestamp"`
|
CloseTimeStamp uint64 `json:"close_timestamp"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewSegment(id uuid.UUID, collection Collection, ptag string, chStart int, chEnd int, openTime time.Time, closeTime time.Time) Segment {
|
||||||
|
return Segment{
|
||||||
|
SegmentID: uint64(id.ID()),
|
||||||
|
Collection: collection,
|
||||||
|
PartitionTag: ptag,
|
||||||
|
ChannelStart: chStart,
|
||||||
|
ChannelEnd: chEnd,
|
||||||
|
OpenTimeStamp: uint64(openTime.Unix()),
|
||||||
|
CloseTimeStamp: uint64(closeTime.Unix()),
|
||||||
|
}
|
||||||
|
}
|
||||||
func Segment2JSON(s Segment) (string, error) {
|
func Segment2JSON(s Segment) (string, error) {
|
||||||
b, err := json.Marshal(&s)
|
b, err := json.Marshal(&s)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -59,16 +72,3 @@ func JSON2Segment(s string) (*Segment, error) {
|
||||||
}
|
}
|
||||||
return &c, nil
|
return &c, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func FakeCreateSegment(id uint64, cl Collection, opentime time.Time, closetime time.Time) Segment {
|
|
||||||
seg := Segment{
|
|
||||||
SegmentID: id,
|
|
||||||
Collection: cl,
|
|
||||||
PartitionTag: "default",
|
|
||||||
ChannelStart: 0,
|
|
||||||
ChannelEnd: 100,
|
|
||||||
OpenTimeStamp: opentime,
|
|
||||||
CloseTimeStamp: closetime,
|
|
||||||
}
|
|
||||||
return seg
|
|
||||||
}
|
|
||||||
|
|
|
@ -34,15 +34,15 @@ var Ts = Segment{
|
||||||
Collection: Collection{
|
Collection: Collection{
|
||||||
ID: uint64(11111),
|
ID: uint64(11111),
|
||||||
Name: "test-collection",
|
Name: "test-collection",
|
||||||
CreateTime: time.Now(),
|
CreateTime: uint64(time.Now().Unix()),
|
||||||
SegmentIDs: []uint64{uint64(10111)},
|
SegmentIDs: []uint64{uint64(10111)},
|
||||||
PartitionTags: []string{"default"},
|
PartitionTags: []string{"default"},
|
||||||
},
|
},
|
||||||
PartitionTag: "default",
|
PartitionTag: "default",
|
||||||
ChannelStart: 1,
|
ChannelStart: 1,
|
||||||
ChannelEnd: 100,
|
ChannelEnd: 100,
|
||||||
OpenTimeStamp: time.Now(),
|
OpenTimeStamp: uint64(time.Now().Unix()),
|
||||||
CloseTimeStamp: time.Now().Add(1 * time.Hour),
|
CloseTimeStamp: uint64(time.Now().Add(1 * time.Hour).Unix()),
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSegment2JSON(t *testing.T) {
|
func TestSegment2JSON(t *testing.T) {
|
||||||
|
|
|
@ -68,7 +68,7 @@ func ComputeCloseTime(ss mock.SegmentStats, kvbase kv.Base) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
seg.CloseTimeStamp = time.Now().Add(time.Duration(sec) * time.Second)
|
seg.CloseTimeStamp = uint64(time.Now().Add(time.Duration(sec) * time.Second).Unix())
|
||||||
updateData, err := mock.Segment2JSON(*seg)
|
updateData, err := mock.Segment2JSON(*seg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -119,25 +119,27 @@ func CollectionController(ch chan *messagepb.Mapping) {
|
||||||
defer cli.Close()
|
defer cli.Close()
|
||||||
kvbase := kv.NewEtcdKVBase(cli, common.ETCD_ROOT_PATH)
|
kvbase := kv.NewEtcdKVBase(cli, common.ETCD_ROOT_PATH)
|
||||||
for collection := range ch {
|
for collection := range ch {
|
||||||
pTag := uuid.New()
|
sID := uuid.New()
|
||||||
cID := uuid.New()
|
cID := uuid.New()
|
||||||
c := mock.Collection{
|
fieldMetas := []*messagepb.FieldMeta{}
|
||||||
Name: collection.CollectionName,
|
if collection.Schema != nil {
|
||||||
CreateTime: time.Now(),
|
fieldMetas = collection.Schema.FieldMetas
|
||||||
ID: uint64(cID.ID()),
|
|
||||||
PartitionTags: []string{pTag.String()},
|
|
||||||
}
|
}
|
||||||
s := mock.FakeCreateSegment(uint64(pTag.ID()), c, time.Now(), time.Unix(1<<36-1, 0))
|
c := mock.NewCollection(cID, collection.CollectionName,
|
||||||
collectionData, _ := mock.Collection2JSON(c)
|
time.Now(), fieldMetas, []uuid.UUID{sID},
|
||||||
|
[]string{"default"})
|
||||||
|
cm := mock.GrpcMarshal(&c)
|
||||||
|
s := mock.NewSegment(sID, c, "default", 0, 100, time.Now(), time.Unix(1<<36-1, 0))
|
||||||
|
collectionData, _ := mock.Collection2JSON(*cm)
|
||||||
segmentData, err := mock.Segment2JSON(s)
|
segmentData, err := mock.Segment2JSON(s)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
err = kvbase.Save(cID.String(), collectionData)
|
err = kvbase.Save("collection/"+cID.String(), collectionData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
err = kvbase.Save(pTag.String(), segmentData)
|
err = kvbase.Save("segment/"+sID.String(), segmentData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue