Use type alias

Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
pull/4973/head^2
zhenshan.cao 2020-11-04 17:58:43 +08:00 committed by yefu.chen
parent 9c807cc4df
commit e9122921b9
53 changed files with 480 additions and 825 deletions

View File

@ -1,89 +0,0 @@
package main
import (
"context"
"flag"
"fmt"
"log"
"strconv"
"github.com/zilliztech/milvus-distributed/internal/conf"
"github.com/zilliztech/milvus-distributed/internal/msgclient"
"github.com/zilliztech/milvus-distributed/internal/storage"
"github.com/zilliztech/milvus-distributed/internal/writer"
)
func main() {
var yamlFile string
flag.StringVar(&yamlFile, "yaml", "", "yaml file")
flag.Parse()
// flag.Usage()
fmt.Println("yaml file: ", yamlFile)
conf.LoadConfig(yamlFile)
pulsarAddr := "pulsar://"
pulsarAddr += conf.Config.Pulsar.Address
pulsarAddr += ":"
pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10)
mc := msgclient.WriterMessageClient{}
mc.InitClient(pulsarAddr)
//TODO::close client / consumer/ producer
mc.ReceiveMessage()
ctx := context.Background()
kv, err := storage.NewStore(ctx, conf.Config.Storage.Driver)
// TODO:: if err != nil, should retry link
if err != nil {
log.Fatal(err)
}
msgCounter := writer.MsgCounter{
InsertCounter: 0,
DeleteCounter: 0,
}
wn := writer.WriteNode{
KvStore: &kv,
MessageClient: &mc,
TimeSync: 100,
MsgCounter: &msgCounter,
}
const Debug = true
if Debug {
const CountInsertMsgBaseline = 1000 * 1000
var BaselineCounter int64 = 0
for {
if ctx.Err() != nil {
break
}
msgLength := wn.MessageClient.PrepareBatchMsg()
if wn.MsgCounter.InsertCounter/CountInsertMsgBaseline != BaselineCounter {
wn.WriteWriterLog()
BaselineCounter = wn.MsgCounter.InsertCounter / CountInsertMsgBaseline
}
if msgLength > 0 {
wn.DoWriteNode(ctx)
fmt.Println("write node do a batch message, storage len: ", msgLength)
}
}
}
//TODO:: start a gorouter for searchById
for {
if ctx.Err() != nil {
break
}
msgLength := wn.MessageClient.PrepareBatchMsg()
if msgLength > 0 {
wn.DoWriteNode(ctx)
fmt.Println("write node do a batch message, storage len: ", msgLength)
}
}
wn.Close()
}

View File

@ -42,7 +42,7 @@ func (req *baseRequest) Notify(err error) {
type idRequest struct {
baseRequest
id uint64
id UniqueID
count uint32
}
@ -52,7 +52,7 @@ func (req *idRequest) Wait() {
type tsoRequest struct {
baseRequest
timestamp uint64
timestamp Timestamp
count uint32
}

View File

@ -7,13 +7,16 @@ import (
"time"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
type UniqueID = typeutil.UniqueID
type IdAllocator struct {
Allocator
idStart int64
idEnd int64
idStart UniqueID
idEnd UniqueID
}
func NewIdAllocator(ctx context.Context) (*IdAllocator, error) {
@ -56,7 +59,7 @@ func (ta *IdAllocator) processFunc(req request) {
fmt.Println("process Id")
}
func (ta *IdAllocator) AllocOne() (int64, error) {
func (ta *IdAllocator) AllocOne() (UniqueID, error) {
ret, _, err := ta.Alloc(1)
if err != nil {
return 0, err
@ -64,7 +67,7 @@ func (ta *IdAllocator) AllocOne() (int64, error) {
return ret, nil
}
func (ta *IdAllocator) Alloc(count uint32) (int64, int64, error) {
func (ta *IdAllocator) Alloc(count uint32) (UniqueID, UniqueID, error) {
req := &idRequest{baseRequest: baseRequest{done: make(chan error), valid: false}}
req.count = count
@ -74,6 +77,6 @@ func (ta *IdAllocator) Alloc(count uint32) (int64, int64, error) {
if !req.IsValid() {
return 0, 0, nil
}
start, count := int64(req.id), req.count
start, count := req.id, req.count
return start, start + int64(count), nil
}

View File

@ -6,10 +6,12 @@ import (
"log"
"time"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
)
type Timestamp = uint64
type Timestamp = typeutil.Timestamp
const (
tsCountPerRPC = 2 << 18 * 10
@ -18,8 +20,8 @@ const (
type TimestampAllocator struct {
Allocator
lastTsBegin uint64
lastTsEnd uint64
lastTsBegin Timestamp
lastTsEnd Timestamp
}
func NewTimestampAllocator(ctx context.Context) (*TimestampAllocator, error) {

View File

@ -5,10 +5,14 @@ import (
"path"
"runtime"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
storagetype "github.com/zilliztech/milvus-distributed/internal/storage/type"
yaml "gopkg.in/yaml.v2"
)
type UniqueID = typeutil.UniqueID
// yaml.MapSlice
type MasterConfig struct {
@ -17,7 +21,7 @@ type MasterConfig struct {
PulsarMoniterInterval int32
PulsarTopic string
SegmentThreshole float32
ProxyIdList []int64
ProxyIdList []UniqueID
QueryNodeNum int
WriteNodeNum int
}

View File

@ -3,6 +3,8 @@ package collection
import (
"time"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"github.com/gogo/protobuf/proto"
jsoniter "github.com/json-iterator/go"
"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
@ -11,15 +13,18 @@ import (
var json = jsoniter.ConfigCompatibleWithStandardLibrary
type UniqueID = typeutil.UniqueID
type Timestamp = typeutil.Timestamp
type Collection struct {
ID int64 `json:"id"`
ID UniqueID `json:"id"`
Name string `json:"name"`
CreateTime uint64 `json:"creat_time"`
CreateTime Timestamp `json:"creat_time"`
Schema []FieldMeta `json:"schema"`
// ExtraSchema []FieldMeta `json:"extra_schema"`
SegmentIDs []int64 `json:"segment_ids"`
PartitionTags []string `json:"partition_tags"`
GrpcMarshalString string `json:"grpc_marshal_string"`
SegmentIDs []UniqueID `json:"segment_ids"`
PartitionTags []string `json:"partition_tags"`
GrpcMarshalString string `json:"grpc_marshal_string"`
}
type FieldMeta struct {
@ -56,10 +61,10 @@ func GrpcMarshal(c *Collection) *Collection {
return c
}
func NewCollection(id int64, name string, createTime time.Time,
schema []*schemapb.FieldSchema, sIds []int64, ptags []string) Collection {
func NewCollection(id UniqueID, name string, createTime time.Time,
schema []*schemapb.FieldSchema, sIds []UniqueID, ptags []string) Collection {
segementIDs := []int64{}
segementIDs := []UniqueID{}
newSchema := []FieldMeta{}
for _, v := range schema {
newSchema = append(newSchema, FieldMeta{FieldName: v.Name, Type: v.DataType, DIM: 16})
@ -70,7 +75,7 @@ func NewCollection(id int64, name string, createTime time.Time,
return Collection{
ID: id,
Name: name,
CreateTime: uint64(createTime.Unix()),
CreateTime: Timestamp(createTime.Unix()),
Schema: newSchema,
SegmentIDs: segementIDs,
PartitionTags: ptags,

View File

@ -9,11 +9,11 @@ import (
)
var (
cid = int64(10011111234)
cid = UniqueID(10011111234)
name = "test-segment"
createTime = time.Now()
schema = []*schemapb.FieldSchema{}
sIds = []int64{111111, 222222}
sIds = []UniqueID{111111, 222222}
ptags = []string{"default", "test"}
)

View File

@ -6,6 +6,8 @@ import (
"log"
"strconv"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
@ -13,6 +15,8 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/servicepb"
)
type Timestamp = typeutil.Timestamp
const collectionMetaPrefix = "collection/"
type createCollectionTask struct {
@ -73,16 +77,16 @@ func (t *createCollectionTask) Execute() error {
}
// TODO: allocate collection id
var collectionId int64 = 0
var collectionId UniqueID = 0
// TODO: allocate timestamp
var collectionCreateTime uint64 = 0
var collectionCreateTime Timestamp = 0
collection := etcdpb.CollectionMeta{
Id: collectionId,
Schema: &schema,
CreateTime: collectionCreateTime,
// TODO: initial segment?
SegmentIds: make([]int64, 0),
SegmentIds: make([]UniqueID, 0),
// TODO: initial partition?
PartitionTags: make([]string, 0),
}

View File

@ -5,6 +5,8 @@ import (
"strconv"
"time"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"github.com/zilliztech/milvus-distributed/internal/master/id"
"github.com/zilliztech/milvus-distributed/internal/conf"
@ -14,6 +16,8 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
)
type UniqueID = typeutil.UniqueID
func CollectionController(ch chan *schemapb.CollectionSchema, kvbase kv.Base, errch chan error) {
for collectionMeta := range ch {
sID, _ := id.AllocOne()
@ -24,7 +28,7 @@ func CollectionController(ch chan *schemapb.CollectionSchema, kvbase kv.Base, er
fieldMetas = collectionMeta.Fields
}
c := collection.NewCollection(cID, collectionMeta.Name,
time.Now(), fieldMetas, []int64{sID, s2ID},
time.Now(), fieldMetas, []UniqueID{sID, s2ID},
[]string{"default"})
cm := collection.GrpcMarshal(&c)
s := segment.NewSegment(sID, cID, collectionMeta.Name, "default", 0, 511, time.Now(), time.Unix(1<<36-1, 0))
@ -61,7 +65,7 @@ func WriteCollection2Datastore(collectionMeta *schemapb.CollectionSchema, kvbase
fieldMetas = collectionMeta.Fields
}
c := collection.NewCollection(cID, collectionMeta.Name,
time.Now(), fieldMetas, []int64{sID},
time.Now(), fieldMetas, []UniqueID{sID},
[]string{"default"})
cm := collection.GrpcMarshal(&c)
s := segment.NewSegment(sID, cID, collectionMeta.Name, "default", 0, conf.Config.Pulsar.TopicNum, time.Now(), time.Unix(1<<46-1, 0))

View File

@ -28,7 +28,7 @@ func TestComputeClosetTime(t *testing.T) {
var news internalpb.SegmentStatistics
for i := 0; i < 10; i++ {
news = internalpb.SegmentStatistics{
SegmentId: int64(6875940398055133887),
SegmentId: UniqueID(6875940398055133887),
MemorySize: int64(i * 1000),
}
ComputeCloseTime(news, kvbase)

View File

@ -15,8 +15,11 @@ package id
import (
"github.com/zilliztech/milvus-distributed/internal/master/tso"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
type UniqueID = typeutil.UniqueID
// GlobalTSOAllocator is the global single point TSO allocator.
type GlobalIdAllocator struct {
allocator tso.Allocator
@ -35,29 +38,29 @@ func (gia *GlobalIdAllocator) Initialize() error {
// GenerateTSO is used to generate a given number of TSOs.
// Make sure you have initialized the TSO allocator before calling.
func (gia *GlobalIdAllocator) Alloc(count uint32) (int64, int64, error) {
func (gia *GlobalIdAllocator) Alloc(count uint32) (UniqueID, UniqueID, error) {
timestamp, err := gia.allocator.GenerateTSO(count)
if err != nil {
return 0, 0, err
}
idStart := int64(timestamp)
idStart := UniqueID(timestamp)
idEnd := idStart + int64(count)
return idStart, idEnd, nil
}
func (gia *GlobalIdAllocator) AllocOne() (int64, error) {
func (gia *GlobalIdAllocator) AllocOne() (UniqueID, error) {
timestamp, err := gia.allocator.GenerateTSO(1)
if err != nil {
return 0, err
}
idStart := int64(timestamp)
idStart := UniqueID(timestamp)
return idStart, nil
}
func AllocOne() (int64, error) {
func AllocOne() (UniqueID, error) {
return allocator.AllocOne()
}
func Alloc(count uint32) (int64, int64, error) {
func Alloc(count uint32) (UniqueID, UniqueID, error) {
return allocator.Alloc(count)
}

View File

@ -4,19 +4,23 @@ import (
"strconv"
"sync"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"github.com/golang/protobuf/proto"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/kv"
pb "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
)
type UniqueID = typeutil.UniqueID
type metaTable struct {
client kv.Base // client of a reliable kv service, i.e. etcd client
tenantId2Meta map[int64]pb.TenantMeta // tenant id to tenant meta
proxyId2Meta map[int64]pb.ProxyMeta // proxy id to proxy meta
collId2Meta map[int64]pb.CollectionMeta // collection id to collection meta
collName2Id map[string]int64 // collection name to collection id
segId2Meta map[int64]pb.SegmentMeta // segment id to segment meta
client kv.Base // client of a reliable kv service, i.e. etcd client
tenantId2Meta map[UniqueID]pb.TenantMeta // tenant id to tenant meta
proxyId2Meta map[UniqueID]pb.ProxyMeta // proxy id to proxy meta
collId2Meta map[UniqueID]pb.CollectionMeta // collection id to collection meta
collName2Id map[string]UniqueID // collection name to collection id
segId2Meta map[UniqueID]pb.SegmentMeta // segment id to segment meta
tenantLock sync.RWMutex
proxyLock sync.RWMutex
@ -39,11 +43,11 @@ func NewMetaTable(kv kv.Base) (*metaTable, error) {
func (mt *metaTable) reloadFromKV() error {
mt.tenantId2Meta = make(map[int64]pb.TenantMeta)
mt.proxyId2Meta = make(map[int64]pb.ProxyMeta)
mt.collId2Meta = make(map[int64]pb.CollectionMeta)
mt.collName2Id = make(map[string]int64)
mt.segId2Meta = make(map[int64]pb.SegmentMeta)
mt.tenantId2Meta = make(map[UniqueID]pb.TenantMeta)
mt.proxyId2Meta = make(map[UniqueID]pb.ProxyMeta)
mt.collId2Meta = make(map[UniqueID]pb.CollectionMeta)
mt.collName2Id = make(map[string]UniqueID)
mt.segId2Meta = make(map[UniqueID]pb.SegmentMeta)
_, values, err := mt.client.LoadWithPrefix("tenant")
if err != nil {
@ -129,7 +133,7 @@ func (mt *metaTable) saveSegmentMeta(seg *pb.SegmentMeta) error {
}
// mt.ddLock.Lock() before call this function
func (mt *metaTable) deleteSegmentMeta(segId int64) error {
func (mt *metaTable) deleteSegmentMeta(segId UniqueID) error {
_, ok := mt.segId2Meta[segId]
if ok {
@ -140,7 +144,7 @@ func (mt *metaTable) deleteSegmentMeta(segId int64) error {
}
// mt.ddLock.Lock() before call this function
func (mt *metaTable) saveCollectionAndDeleteSegmentsMeta(coll *pb.CollectionMeta, segIds []int64) error {
func (mt *metaTable) saveCollectionAndDeleteSegmentsMeta(coll *pb.CollectionMeta, segIds []UniqueID) error {
segIdStrs := make([]string, 0, len(segIds))
for _, segId := range segIds {
segIdStrs = append(segIdStrs, "/segment/"+strconv.FormatInt(segId, 10))
@ -191,7 +195,7 @@ func (mt *metaTable) saveCollectionsAndSegmentsMeta(coll *pb.CollectionMeta, seg
}
// mt.ddLock.Lock() before call this function
func (mt *metaTable) deleteCollectionsAndSegmentsMeta(collId int64, segIds []int64) error {
func (mt *metaTable) deleteCollectionsAndSegmentsMeta(collId UniqueID, segIds []UniqueID) error {
collIdStr := "/collection/" + strconv.FormatInt(collId, 10)
totalIdStrs := make([]string, 0, 1+len(segIds))
@ -244,7 +248,7 @@ func (mt *metaTable) AddCollection(coll *pb.CollectionMeta) error {
return nil
}
func (mt *metaTable) DeleteCollection(collId int64) error {
func (mt *metaTable) DeleteCollection(collId UniqueID) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
@ -261,7 +265,7 @@ func (mt *metaTable) DeleteCollection(collId int64) error {
return nil
}
func (mt *metaTable) HasCollection(collId int64) bool {
func (mt *metaTable) HasCollection(collId UniqueID) bool {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
_, ok := mt.collId2Meta[collId]
@ -286,7 +290,7 @@ func (mt *metaTable) GetCollectionByName(collectionName string) (*pb.CollectionM
return &col, nil
}
func (mt *metaTable) AddPartition(collId int64, tag string) error {
func (mt *metaTable) AddPartition(collId UniqueID, tag string) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
coll, ok := mt.collId2Meta[collId]
@ -309,7 +313,7 @@ func (mt *metaTable) AddPartition(collId int64, tag string) error {
return nil
}
func (mt *metaTable) HasPartition(collId int64, tag string) bool {
func (mt *metaTable) HasPartition(collId UniqueID, tag string) bool {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
col, ok := mt.collId2Meta[collId]
@ -324,7 +328,7 @@ func (mt *metaTable) HasPartition(collId int64, tag string) bool {
return false
}
func (mt *metaTable) DeletePartition(collId int64, tag string) error {
func (mt *metaTable) DeletePartition(collId UniqueID, tag string) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
@ -343,8 +347,8 @@ func (mt *metaTable) DeletePartition(collId int64, tag string) error {
return nil
}
to_delete_seg := make([]int64, 0, len(coll_meta.SegmentIds))
seg := make([]int64, 0, len(coll_meta.SegmentIds))
to_delete_seg := make([]UniqueID, 0, len(coll_meta.SegmentIds))
seg := make([]UniqueID, 0, len(coll_meta.SegmentIds))
for _, s := range coll_meta.SegmentIds {
sm, ok := mt.segId2Meta[s]
if !ok {
@ -381,7 +385,7 @@ func (mt *metaTable) AddSegment(seg *pb.SegmentMeta) error {
return nil
}
func (mt *metaTable) GetSegmentById(segId int64) (*pb.SegmentMeta, error) {
func (mt *metaTable) GetSegmentById(segId UniqueID) (*pb.SegmentMeta, error) {
mt.ddLock.RLock()
defer mt.ddLock.RUnlock()
@ -392,7 +396,7 @@ func (mt *metaTable) GetSegmentById(segId int64) (*pb.SegmentMeta, error) {
return &sm, nil
}
func (mt *metaTable) DeleteSegment(segId int64) error {
func (mt *metaTable) DeleteSegment(segId UniqueID) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
@ -412,7 +416,7 @@ func (mt *metaTable) DeleteSegment(segId int64) error {
}
}
err := mt.saveCollectionAndDeleteSegmentsMeta(&coll_meta, []int64{segId})
err := mt.saveCollectionAndDeleteSegmentsMeta(&coll_meta, []UniqueID{segId})
if err != nil {
_ = mt.reloadFromKV()
return err
@ -420,7 +424,7 @@ func (mt *metaTable) DeleteSegment(segId int64) error {
return nil
}
func (mt *metaTable) CloseSegment(segId int64, closeTs Timestamp, num_rows int64) error {
func (mt *metaTable) CloseSegment(segId UniqueID, closeTs Timestamp, num_rows int64) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
@ -429,7 +433,7 @@ func (mt *metaTable) CloseSegment(segId int64, closeTs Timestamp, num_rows int64
return errors.Errorf("can't find segment id = " + strconv.FormatInt(segId, 10))
}
seg_meta.CloseTime = uint64(closeTs)
seg_meta.CloseTime = closeTs
seg_meta.NumRows = num_rows
err := mt.saveSegmentMeta(&seg_meta)

View File

@ -25,7 +25,7 @@ func TestMetaTable_Collection(t *testing.T) {
Name: "coll1",
},
CreateTime: 0,
SegmentIds: []int64{},
SegmentIds: []UniqueID{},
PartitionTags: []string{},
}
col_meta_2 := pb.CollectionMeta{
@ -34,7 +34,7 @@ func TestMetaTable_Collection(t *testing.T) {
Name: "coll1",
},
CreateTime: 0,
SegmentIds: []int64{},
SegmentIds: []UniqueID{},
PartitionTags: []string{},
}
col_meta_3 := pb.CollectionMeta{
@ -43,7 +43,7 @@ func TestMetaTable_Collection(t *testing.T) {
Name: "coll2",
},
CreateTime: 0,
SegmentIds: []int64{},
SegmentIds: []UniqueID{},
PartitionTags: []string{},
}
col_meta_4 := pb.CollectionMeta{
@ -52,7 +52,7 @@ func TestMetaTable_Collection(t *testing.T) {
Name: "coll2",
},
CreateTime: 0,
SegmentIds: []int64{1},
SegmentIds: []UniqueID{1},
PartitionTags: []string{},
}
col_meta_5 := pb.CollectionMeta{
@ -61,7 +61,7 @@ func TestMetaTable_Collection(t *testing.T) {
Name: "coll2",
},
CreateTime: 0,
SegmentIds: []int64{1},
SegmentIds: []UniqueID{1},
PartitionTags: []string{"1"},
}
seg_id_1 := pb.SegmentMeta{
@ -143,7 +143,7 @@ func TestMetaTable_DeletePartition(t *testing.T) {
Name: "coll1",
},
CreateTime: 0,
SegmentIds: []int64{},
SegmentIds: []UniqueID{},
PartitionTags: []string{},
}
seg_id_1 := pb.SegmentMeta{
@ -226,7 +226,7 @@ func TestMetaTable_Segment(t *testing.T) {
Name: "coll1",
},
CreateTime: 0,
SegmentIds: []int64{},
SegmentIds: []UniqueID{},
PartitionTags: []string{},
}
seg_meta := pb.SegmentMeta{
@ -258,11 +258,11 @@ func TestMetaTable_Segment(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, 0, len(get_col_meta.SegmentIds))
meta.tenantId2Meta = make(map[int64]pb.TenantMeta)
meta.proxyId2Meta = make(map[int64]pb.ProxyMeta)
meta.collId2Meta = make(map[int64]pb.CollectionMeta)
meta.collName2Id = make(map[string]int64)
meta.segId2Meta = make(map[int64]pb.SegmentMeta)
meta.tenantId2Meta = make(map[UniqueID]pb.TenantMeta)
meta.proxyId2Meta = make(map[UniqueID]pb.ProxyMeta)
meta.collId2Meta = make(map[UniqueID]pb.CollectionMeta)
meta.collName2Id = make(map[string]UniqueID)
meta.segId2Meta = make(map[UniqueID]pb.SegmentMeta)
err = meta.reloadFromKV()
assert.Nil(t, err)

View File

@ -3,24 +3,29 @@ package segment
import (
"time"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
jsoniter "github.com/json-iterator/go"
)
type UniqueID = typeutil.UniqueID
type Timestamp = typeutil.Timestamp
var json = jsoniter.ConfigCompatibleWithStandardLibrary
type Segment struct {
SegmentID int64 `json:"segment_id"`
CollectionID int64 `json:"collection_id"`
PartitionTag string `json:"partition_tag"`
ChannelStart int `json:"channel_start"`
ChannelEnd int `json:"channel_end"`
OpenTimeStamp uint64 `json:"open_timestamp"`
CloseTimeStamp uint64 `json:"close_timestamp"`
CollectionName string `json:"collection_name"`
Rows int64 `json:"rows"`
SegmentID UniqueID `json:"segment_id"`
CollectionID UniqueID `json:"collection_id"`
PartitionTag string `json:"partition_tag"`
ChannelStart int `json:"channel_start"`
ChannelEnd int `json:"channel_end"`
OpenTimeStamp Timestamp `json:"open_timestamp"`
CloseTimeStamp Timestamp `json:"close_timestamp"`
CollectionName string `json:"collection_name"`
Rows int64 `json:"rows"`
}
func NewSegment(id int64, collectioID int64, cName string, ptag string, chStart int, chEnd int, openTime time.Time, closeTime time.Time) Segment {
func NewSegment(id UniqueID, collectioID UniqueID, cName string, ptag string, chStart int, chEnd int, openTime time.Time, closeTime time.Time) Segment {
return Segment{
SegmentID: id,
CollectionID: collectioID,
@ -28,8 +33,8 @@ func NewSegment(id int64, collectioID int64, cName string, ptag string, chStart
PartitionTag: ptag,
ChannelStart: chStart,
ChannelEnd: chEnd,
OpenTimeStamp: uint64(openTime.Unix()),
CloseTimeStamp: uint64(closeTime.Unix()),
OpenTimeStamp: Timestamp(openTime.Unix()),
CloseTimeStamp: Timestamp(closeTime.Unix()),
}
}

View File

@ -9,7 +9,6 @@ import (
)
// TODO: get timestamp from timestampOracle
type Timestamp uint64
type baseTask struct {
kvBase *kv.Base

View File

@ -8,6 +8,8 @@ import (
"sync"
"time"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"github.com/zilliztech/milvus-distributed/internal/conf"
"github.com/apache/pulsar-client-go/pulsar"
@ -15,6 +17,9 @@ import (
pb "github.com/zilliztech/milvus-distributed/internal/proto/message"
)
type UniqueID = typeutil.UniqueID
type Timestamp = typeutil.Timestamp
const stopReadFlagId int64 = -1
type TimeTickReader struct {
@ -24,9 +29,9 @@ type TimeTickReader struct {
readerProducer []pulsar.Producer
interval int64
proxyIdList []int64
proxyIdList []UniqueID
timeTickPeerProxy map[int64]uint64
timeTickPeerProxy map[UniqueID]Timestamp
ctx context.Context
}
@ -58,7 +63,7 @@ func (r *TimeTickReader) timeSync() {
return
default:
time.Sleep(time.Millisecond * time.Duration(r.interval))
var minTimeStamp uint64
var minTimeStamp Timestamp
for _, minTimeStamp = range r.timeTickPeerProxy {
break
}
@ -134,7 +139,7 @@ func newTimeTickReader(
timeTickTopic string,
timeTickSubName string,
readTopics []string,
proxyIdList []int64,
proxyIdList []UniqueID,
) *TimeTickReader {
pulsarAddr := "pulsar://"
pulsarAddr += conf.Config.Pulsar.Address
@ -168,7 +173,7 @@ func newTimeTickReader(
readerQueueSize = 1024
}
r.timeTickPeerProxy = make(map[int64]uint64)
r.timeTickPeerProxy = make(map[UniqueID]Timestamp)
r.ctx = ctx
var client pulsar.Client

View File

@ -6,6 +6,8 @@ import (
"log"
"strconv"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/golang/protobuf/proto"
"github.com/zilliztech/milvus-distributed/internal/conf"
@ -25,6 +27,9 @@ const (
Statistics MessageType = 5
)
type UniqueID = typeutil.UniqueID
type Timestamp = typeutil.Timestamp
type ReaderMessageClient struct {
// context
ctx context.Context
@ -39,7 +44,7 @@ type ReaderMessageClient struct {
// pulsar
client pulsar.Client
//searchResultProducer pulsar.Producer
searchResultProducers map[int64]pulsar.Producer
searchResultProducers map[UniqueID]pulsar.Producer
segmentsStatisticProducer pulsar.Producer
searchConsumer pulsar.Consumer
key2segConsumer pulsar.Consumer
@ -48,27 +53,27 @@ type ReaderMessageClient struct {
InsertOrDeleteMsg []*msgpb.InsertOrDeleteMsg
Key2SegMsg []*msgpb.Key2SegMsg
SearchMsg []*msgpb.SearchMsg
timestampBatchStart uint64
timestampBatchEnd uint64
timestampBatchStart Timestamp
timestampBatchEnd Timestamp
batchIDLen int
//client id
MessageClientID int
}
func (mc *ReaderMessageClient) GetTimeNow() uint64 {
func (mc *ReaderMessageClient) GetTimeNow() Timestamp {
return mc.timestampBatchEnd
}
func (mc *ReaderMessageClient) TimeSyncStart() uint64 {
func (mc *ReaderMessageClient) TimeSyncStart() Timestamp {
return mc.timestampBatchStart
}
func (mc *ReaderMessageClient) TimeSyncEnd() uint64 {
func (mc *ReaderMessageClient) TimeSyncEnd() Timestamp {
return mc.timestampBatchEnd
}
func (mc *ReaderMessageClient) SendResult(ctx context.Context, msg msgpb.QueryResult, producerKey int64) {
func (mc *ReaderMessageClient) SendResult(ctx context.Context, msg msgpb.QueryResult, producerKey UniqueID) {
var msgBuffer, _ = proto.Marshal(&msg)
if _, err := mc.searchResultProducers[producerKey].Send(ctx, &pulsar.ProducerMessage{
Payload: msgBuffer,
@ -203,7 +208,7 @@ func (mc *ReaderMessageClient) InitClient(ctx context.Context, url string) {
mc.MessageClientID = conf.Config.Reader.ClientId
//create producer
mc.searchResultProducers = make(map[int64]pulsar.Producer)
mc.searchResultProducers = make(map[UniqueID]pulsar.Producer)
proxyIdList := conf.Config.Master.ProxyIdList
searchResultTopicName := "SearchResult-"

View File

@ -17,23 +17,23 @@ func GetMarshalers(inputMsgType MsgType, outputMsgType MsgType) (*TsMsgMarshaler
func GetMarshaler(MsgType MsgType) *TsMsgMarshaler {
switch MsgType {
case KInsert:
case internalPb.MsgType_kInsert:
insertMarshaler := &InsertMarshaler{}
var tsMsgMarshaller TsMsgMarshaler = insertMarshaler
return &tsMsgMarshaller
case KDelete:
case internalPb.MsgType_kDelete:
deleteMarshaler := &DeleteMarshaler{}
var tsMsgMarshaller TsMsgMarshaler = deleteMarshaler
return &tsMsgMarshaller
case KSearch:
case internalPb.MsgType_kSearch:
searchMarshaler := &SearchMarshaler{}
var tsMsgMarshaller TsMsgMarshaler = searchMarshaler
return &tsMsgMarshaller
case KSearchResult:
case internalPb.MsgType_kSearchResult:
searchResultMarshler := &SearchResultMarshaler{}
var tsMsgMarshaller TsMsgMarshaler = searchResultMarshler
return &tsMsgMarshaller
case KTimeTick:
case internalPb.MsgType_kTimeTick:
timeTickMarshaler := &TimeTickMarshaler{}
var tsMsgMarshaller TsMsgMarshaler = timeTickMarshaler
return &tsMsgMarshaller

View File

@ -5,14 +5,20 @@ import (
"log"
"sync"
internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/apache/pulsar-client-go/pulsar"
commonPb "github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
type UniqueID = typeutil.UniqueID
type Timestamp = typeutil.Timestamp
type IntPrimaryKey = typeutil.IntPrimaryKey
type MsgPack struct {
BeginTs typeutil.Timestamp
EndTs typeutil.Timestamp
BeginTs Timestamp
EndTs Timestamp
Msgs []*TsMsg
}
@ -230,7 +236,7 @@ type PulsarTtMsgStream struct {
inputBuf []*TsMsg
unsolvedBuf []*TsMsg
msgPacks []*MsgPack
lastTimeStamp typeutil.Timestamp
lastTimeStamp Timestamp
}
func (ms *PulsarTtMsgStream) Start() {
@ -240,7 +246,7 @@ func (ms *PulsarTtMsgStream) Start() {
func (ms *PulsarTtMsgStream) bufMsgPackToChannel() {
wg := sync.WaitGroup{}
wg.Add(len(ms.consumers))
eofMsgTimeStamp := make(map[int]typeutil.Timestamp)
eofMsgTimeStamp := make(map[int]Timestamp)
mu := sync.Mutex{}
for i := 0; i < len(ms.consumers); i++ {
go ms.findTimeTick(context.Background(), i, eofMsgTimeStamp, &wg, &mu)
@ -274,7 +280,7 @@ func (ms *PulsarTtMsgStream) bufMsgPackToChannel() {
func (ms *PulsarTtMsgStream) findTimeTick(ctx context.Context,
channelIndex int,
eofMsgMap map[int]typeutil.Timestamp,
eofMsgMap map[int]Timestamp,
wg *sync.WaitGroup,
mu *sync.Mutex) {
for {
@ -289,7 +295,7 @@ func (ms *PulsarTtMsgStream) findTimeTick(ctx context.Context,
(*ms.consumers[channelIndex]).Ack(pulsarMsg)
tsMsg, status := (*ms.msgUnmarshaler).Unmarshal(pulsarMsg.Payload())
// TODO:: Find the EOF
if (*tsMsg).Type() == KTimeTick {
if (*tsMsg).Type() == internalPb.MsgType_kTimeTick {
eofMsgMap[channelIndex] = (*tsMsg).EndTs()
wg.Done()
return
@ -304,8 +310,8 @@ func (ms *PulsarTtMsgStream) findTimeTick(ctx context.Context,
}
}
func checkTimeTickMsg(msg map[int]typeutil.Timestamp) (typeutil.Timestamp, bool) {
checkMap := make(map[typeutil.Timestamp]int)
func checkTimeTickMsg(msg map[int]Timestamp) (Timestamp, bool) {
checkMap := make(map[Timestamp]int)
for _, v := range msg {
checkMap[v] += 1
}

View File

@ -24,10 +24,10 @@ func repackFunc(msgs []*TsMsg, hashKeys [][]int32) map[int32]*MsgPack {
return result
}
func getTsMsg(msgType MsgType, reqId int64, hashValue int32) *TsMsg {
func getTsMsg(msgType MsgType, reqId UniqueID, hashValue int32) *TsMsg {
var tsMsg TsMsg
switch msgType {
case KInsert:
case internalPb.MsgType_kInsert:
insertRequest := internalPb.InsertRequest{
MsgType: internalPb.MsgType_kInsert,
ReqId: reqId,
@ -36,29 +36,29 @@ func getTsMsg(msgType MsgType, reqId int64, hashValue int32) *TsMsg {
SegmentId: 1,
ChannelId: 1,
ProxyId: 1,
Timestamps: []uint64{1},
Timestamps: []Timestamp{1},
}
insertMsg := InsertTask{
HashValues: []int32{hashValue},
InsertRequest: insertRequest,
}
tsMsg = insertMsg
case KDelete:
case internalPb.MsgType_kDelete:
deleteRequest := internalPb.DeleteRequest{
MsgType: internalPb.MsgType_kDelete,
ReqId: reqId,
CollectionName: "Collection",
ChannelId: 1,
ProxyId: 1,
Timestamps: []uint64{1},
PrimaryKeys: []int64{1},
Timestamps: []Timestamp{1},
PrimaryKeys: []IntPrimaryKey{1},
}
deleteMsg := DeleteTask{
HashValues: []int32{hashValue},
DeleteRequest: deleteRequest,
}
tsMsg = deleteMsg
case KSearch:
case internalPb.MsgType_kSearch:
searchRequest := internalPb.SearchRequest{
MsgType: internalPb.MsgType_kSearch,
ReqId: reqId,
@ -71,7 +71,7 @@ func getTsMsg(msgType MsgType, reqId int64, hashValue int32) *TsMsg {
SearchRequest: searchRequest,
}
tsMsg = searchMsg
case KSearchResult:
case internalPb.MsgType_kSearchResult:
searchResult := internalPb.SearchResult{
Status: &commonPb.Status{ErrorCode: commonPb.ErrorCode_SUCCESS},
ReqId: reqId,
@ -85,7 +85,7 @@ func getTsMsg(msgType MsgType, reqId int64, hashValue int32) *TsMsg {
SearchResult: searchResult,
}
tsMsg = searchResultMsg
case KTimeTick:
case internalPb.MsgType_kTimeTick:
timeTickResult := internalPb.TimeTickMsg{
PeerId: reqId,
Timestamp: 1,
@ -131,7 +131,6 @@ func initStream(pulsarAddress string,
//outputStream.Start()
}
// receive msg
receiveCount := 0
for {
@ -144,7 +143,7 @@ func initStream(pulsarAddress string,
}
}
if broadCast {
if receiveCount >= len(msgPack.Msgs) * len(producerChannels) {
if receiveCount >= len(msgPack.Msgs)*len(producerChannels) {
break
}
} else {
@ -162,11 +161,11 @@ func TestStream_Insert(t *testing.T) {
consumerSubName := "subInsert"
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KInsert, 0, 0))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KInsert, 1, 1))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kInsert, 0, 0))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kInsert, 1, 1))
//run stream
initStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, KInsert, KInsert, false)
initStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, internalPb.MsgType_kInsert, internalPb.MsgType_kInsert, false)
}
func TestStream_Delete(t *testing.T) {
@ -176,11 +175,11 @@ func TestStream_Delete(t *testing.T) {
consumerSubName := "subDelete"
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KDelete, 0, 0))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KDelete, 3, 3))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kDelete, 0, 0))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kDelete, 3, 3))
//run stream
initStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, KDelete, KDelete, false)
initStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, internalPb.MsgType_kDelete, internalPb.MsgType_kDelete, false)
}
func TestStream_Search(t *testing.T) {
@ -190,11 +189,11 @@ func TestStream_Search(t *testing.T) {
consumerSubName := "subSearch"
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KSearch, 0, 0))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KSearch, 3, 3))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kSearch, 0, 0))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kSearch, 3, 3))
//run stream
initStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, KSearch, KSearch, false)
initStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, internalPb.MsgType_kSearch, internalPb.MsgType_kSearch, false)
}
func TestStream_SearchResult(t *testing.T) {
@ -204,11 +203,11 @@ func TestStream_SearchResult(t *testing.T) {
consumerSubName := "subSearch"
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KSearchResult, 0, 0))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KSearchResult, 3, 3))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kSearchResult, 0, 0))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kSearchResult, 3, 3))
//run stream
initStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, KSearchResult, KSearchResult, false)
initStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, internalPb.MsgType_kSearchResult, internalPb.MsgType_kSearchResult, false)
}
func TestStream_TimeTick(t *testing.T) {
@ -218,14 +217,13 @@ func TestStream_TimeTick(t *testing.T) {
consumerSubName := "subSearch"
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KTimeTick, 0, 0))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KTimeTick, 3, 3))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kTimeTick, 0, 0))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kTimeTick, 3, 3))
//run stream
initStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, KTimeTick, KTimeTick, false)
initStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, internalPb.MsgType_kTimeTick, internalPb.MsgType_kTimeTick, false)
}
func TestStream_BroadCast(t *testing.T) {
pulsarAddress := "pulsar://localhost:6650"
producerChannels := []string{"insert1", "insert2"}
@ -233,9 +231,9 @@ func TestStream_BroadCast(t *testing.T) {
consumerSubName := "subInsert"
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KTimeTick, 0, 0))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KTimeTick, 3, 3))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kTimeTick, 0, 0))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kTimeTick, 3, 3))
//run stream
initStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, KTimeTick, KTimeTick, true)
initStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, &msgPack, internalPb.MsgType_kTimeTick, internalPb.MsgType_kTimeTick, true)
}

View File

@ -14,14 +14,14 @@ func TestNewStream_Insert(t *testing.T) {
consumerSubName := "subInsert"
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KInsert, 0, 0))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KInsert, 1, 1))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kInsert, 0, 0))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kInsert, 1, 1))
inputStream := NewInputStream(pulsarAddress, producerChannels, false)
outputStream := NewOutputStream(pulsarAddress, 100, 100, consumerChannels, consumerSubName, false)
(*inputStream).SetMsgMarshaler(GetMarshaler(KInsert), nil)
(*inputStream).SetMsgMarshaler(GetMarshaler(internalPb.MsgType_kInsert), nil)
(*inputStream).SetRepackFunc(repackFunc)
(*outputStream).SetMsgMarshaler(nil, GetMarshaler(KInsert))
(*outputStream).SetMsgMarshaler(nil, GetMarshaler(internalPb.MsgType_kInsert))
(*outputStream).Start()
//send msgPack
@ -52,14 +52,14 @@ func TestNewStream_Delete(t *testing.T) {
consumerSubName := "subDelete"
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KDelete, 0, 0))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KDelete, 1, 1))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kDelete, 0, 0))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kDelete, 1, 1))
inputStream := NewInputStream(pulsarAddress, producerChannels, false)
outputStream := NewOutputStream(pulsarAddress, 100, 100, consumerChannels, consumerSubName, false)
(*inputStream).SetMsgMarshaler(GetMarshaler(KDelete), nil)
(*inputStream).SetMsgMarshaler(GetMarshaler(internalPb.MsgType_kDelete), nil)
(*inputStream).SetRepackFunc(repackFunc)
(*outputStream).SetMsgMarshaler(nil, GetMarshaler(KDelete))
(*outputStream).SetMsgMarshaler(nil, GetMarshaler(internalPb.MsgType_kDelete))
(*outputStream).Start()
//send msgPack
@ -90,14 +90,14 @@ func TestNewStream_Search(t *testing.T) {
consumerSubName := "subSearch"
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KSearch, 0, 0))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KSearch, 1, 1))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kSearch, 0, 0))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kSearch, 1, 1))
inputStream := NewInputStream(pulsarAddress, producerChannels, false)
outputStream := NewOutputStream(pulsarAddress, 100, 100, consumerChannels, consumerSubName, false)
(*inputStream).SetMsgMarshaler(GetMarshaler(KSearch), nil)
(*inputStream).SetMsgMarshaler(GetMarshaler(internalPb.MsgType_kSearch), nil)
(*inputStream).SetRepackFunc(repackFunc)
(*outputStream).SetMsgMarshaler(nil, GetMarshaler(KSearch))
(*outputStream).SetMsgMarshaler(nil, GetMarshaler(internalPb.MsgType_kSearch))
(*outputStream).Start()
//send msgPack
@ -128,14 +128,14 @@ func TestNewStream_SearchResult(t *testing.T) {
consumerSubName := "subInsert"
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KSearchResult, 0, 0))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KSearchResult, 1, 1))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kSearchResult, 0, 0))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kSearchResult, 1, 1))
inputStream := NewInputStream(pulsarAddress, producerChannels, false)
outputStream := NewOutputStream(pulsarAddress, 100, 100, consumerChannels, consumerSubName, false)
(*inputStream).SetMsgMarshaler(GetMarshaler(KSearchResult), nil)
(*inputStream).SetMsgMarshaler(GetMarshaler(internalPb.MsgType_kSearchResult), nil)
(*inputStream).SetRepackFunc(repackFunc)
(*outputStream).SetMsgMarshaler(nil, GetMarshaler(KSearchResult))
(*outputStream).SetMsgMarshaler(nil, GetMarshaler(internalPb.MsgType_kSearchResult))
(*outputStream).Start()
//send msgPack
@ -166,14 +166,14 @@ func TestNewStream_TimeTick(t *testing.T) {
consumerSubName := "subInsert"
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KTimeTick, 0, 0))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KTimeTick, 1, 1))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kTimeTick, 0, 0))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kTimeTick, 1, 1))
inputStream := NewInputStream(pulsarAddress, producerChannels, false)
outputStream := NewOutputStream(pulsarAddress, 100, 100, consumerChannels, consumerSubName, false)
(*inputStream).SetMsgMarshaler(GetMarshaler(KTimeTick), nil)
(*inputStream).SetMsgMarshaler(GetMarshaler(internalPb.MsgType_kTimeTick), nil)
(*inputStream).SetRepackFunc(repackFunc)
(*outputStream).SetMsgMarshaler(nil, GetMarshaler(KTimeTick))
(*outputStream).SetMsgMarshaler(nil, GetMarshaler(internalPb.MsgType_kTimeTick))
(*outputStream).Start()
//send msgPack
@ -203,8 +203,8 @@ func TestNewTtStream_Insert_TimeSync(t *testing.T) {
consumerSubName := "subInsert"
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KInsert, 0, 0))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KInsert, 1, 1))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kInsert, 0, 0))
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kInsert, 1, 1))
insertRequest := internalPb.InsertRequest{
MsgType: internalPb.MsgType_kTimeTick,
@ -214,7 +214,7 @@ func TestNewTtStream_Insert_TimeSync(t *testing.T) {
SegmentId: 1,
ChannelId: 1,
ProxyId: 1,
Timestamps: []uint64{1},
Timestamps: []Timestamp{1},
}
insertMsg := InsertTask{
HashValues: []int32{2},
@ -226,9 +226,9 @@ func TestNewTtStream_Insert_TimeSync(t *testing.T) {
inputStream := NewInputStream(pulsarAddress, producerChannels, false)
outputStream := NewOutputStream(pulsarAddress, 100, 100, consumerChannels, consumerSubName, true)
(*inputStream).SetMsgMarshaler(GetMarshaler(KInsert), nil)
(*inputStream).SetMsgMarshaler(GetMarshaler(internalPb.MsgType_kInsert), nil)
(*inputStream).SetRepackFunc(repackFunc)
(*outputStream).SetMsgMarshaler(nil, GetMarshaler(KInsert))
(*outputStream).SetMsgMarshaler(nil, GetMarshaler(internalPb.MsgType_kInsert))
(*outputStream).Start()
//send msgPack
@ -245,7 +245,7 @@ func TestNewTtStream_Insert_TimeSync(t *testing.T) {
fmt.Println("msg type: ", (*v).Type(), ", msg value: ", *v)
}
}
if receiveCount + 1 >= len(msgPack.Msgs) {
if receiveCount+1 >= len(msgPack.Msgs) {
break
}
}

View File

@ -2,21 +2,9 @@ package msgstream
import (
internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
. "github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
type MsgType uint32
const (
KInsert MsgType = 400
KDelete MsgType = 401
KSearch MsgType = 500
KSearchResult MsgType = 1000
KSegmentStatics MsgType = 1100
KTimeTick MsgType = 1200
KTimeSync MsgType = 1201
)
type MsgType = internalPb.MsgType
type TsMsg interface {
SetTs(ts Timestamp)
@ -67,10 +55,7 @@ func (it InsertTask) EndTs() Timestamp {
}
func (it InsertTask) Type() MsgType {
if it.MsgType == internalPb.MsgType_kTimeTick {
return KTimeTick
}
return KInsert
return it.MsgType
}
func (it InsertTask) HashKeys() []int32 {
@ -118,10 +103,8 @@ func (dt DeleteTask) EndTs() Timestamp {
}
func (dt DeleteTask) Type() MsgType {
if dt.MsgType == internalPb.MsgType_kTimeTick {
return KTimeTick
}
return KDelete
return dt.MsgType
}
func (dt DeleteTask) HashKeys() []int32 {
@ -147,10 +130,7 @@ func (st SearchTask) EndTs() Timestamp {
}
func (st SearchTask) Type() MsgType {
if st.MsgType == internalPb.MsgType_kTimeTick {
return KTimeTick
}
return KSearch
return st.MsgType
}
func (st SearchTask) HashKeys() []int32 {
@ -176,7 +156,7 @@ func (srt SearchResultTask) EndTs() Timestamp {
}
func (srt SearchResultTask) Type() MsgType {
return KSearchResult
return srt.MsgType
}
func (srt SearchResultTask) HashKeys() []int32 {
@ -202,25 +182,9 @@ func (tst TimeTickTask) EndTs() Timestamp {
}
func (tst TimeTickTask) Type() MsgType {
return KTimeTick
return tst.MsgType
}
func (tst TimeTickTask) HashKeys() []int32 {
return tst.HashValues
}
///////////////////////////////////////////Key2Seg//////////////////////////////////////////
//type Key2SegTask struct {
// internalPb.Key2SegMsg
//}
//
////TODO::Key2SegMsg don't have timestamp
//func (k2st Key2SegTask) SetTs(ts Timestamp) {}
//
//func (k2st Key2SegTask) Ts() Timestamp {
// return Timestamp(0)
//}
//
//func (k2st Key2SegTask) Type() MsgType {
// return
//}

View File

@ -28,10 +28,10 @@ enum MsgType {
/* Query */
kSearch = 500;
kSearchResult = 501;
/* System Control */
kTimeTick = 1200;
kTimeSync = 1201;
}
enum PeerRole {
@ -196,18 +196,20 @@ message SearchRequest {
}
message SearchResult {
common.Status status = 1;
int64 req_id = 2;
int64 proxy_id = 3;
int64 query_node_id = 4;
uint64 timestamp = 5;
int64 result_channel_id = 6;
repeated service.Hits hits = 7;
MsgType msg_type = 1;
common.Status status = 2;
int64 req_id = 3;
int64 proxy_id = 4;
int64 query_node_id = 5;
uint64 timestamp = 6;
int64 result_channel_id = 7;
repeated service.Hits hits = 8;
}
message TimeTickMsg {
int64 peer_id = 1;
uint64 timestamp = 2;
MsgType msg_type = 1;
int64 peer_id = 2;
uint64 timestamp = 3;
}
@ -230,4 +232,4 @@ message SegmentStatistics {
int64 segment_id = 1;
int64 memory_size = 2;
int64 num_rows = 3;
}
}

View File

@ -42,10 +42,10 @@ const (
MsgType_kInsert MsgType = 400
MsgType_kDelete MsgType = 401
// Query
MsgType_kSearch MsgType = 500
MsgType_kSearch MsgType = 500
MsgType_kSearchResult MsgType = 501
// System Control
MsgType_kTimeTick MsgType = 1200
MsgType_kTimeSync MsgType = 1201
)
var MsgType_name = map[int32]string{
@ -63,8 +63,8 @@ var MsgType_name = map[int32]string{
400: "kInsert",
401: "kDelete",
500: "kSearch",
501: "kSearchResult",
1200: "kTimeTick",
1201: "kTimeSync",
}
var MsgType_value = map[string]int32{
@ -82,8 +82,8 @@ var MsgType_value = map[string]int32{
"kInsert": 400,
"kDelete": 401,
"kSearch": 500,
"kSearchResult": 501,
"kTimeTick": 1200,
"kTimeSync": 1201,
}
func (x MsgType) String() string {
@ -1325,13 +1325,14 @@ func (m *SearchRequest) GetQuery() *commonpb.Blob {
}
type SearchResult struct {
Status *commonpb.Status `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"`
ReqId int64 `protobuf:"varint,2,opt,name=req_id,json=reqId,proto3" json:"req_id,omitempty"`
ProxyId int64 `protobuf:"varint,3,opt,name=proxy_id,json=proxyId,proto3" json:"proxy_id,omitempty"`
QueryNodeId int64 `protobuf:"varint,4,opt,name=query_node_id,json=queryNodeId,proto3" json:"query_node_id,omitempty"`
Timestamp uint64 `protobuf:"varint,5,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
ResultChannelId int64 `protobuf:"varint,6,opt,name=result_channel_id,json=resultChannelId,proto3" json:"result_channel_id,omitempty"`
Hits []*servicepb.Hits `protobuf:"bytes,7,rep,name=hits,proto3" json:"hits,omitempty"`
MsgType MsgType `protobuf:"varint,1,opt,name=msg_type,json=msgType,proto3,enum=milvus.proto.internal.MsgType" json:"msg_type,omitempty"`
Status *commonpb.Status `protobuf:"bytes,2,opt,name=status,proto3" json:"status,omitempty"`
ReqId int64 `protobuf:"varint,3,opt,name=req_id,json=reqId,proto3" json:"req_id,omitempty"`
ProxyId int64 `protobuf:"varint,4,opt,name=proxy_id,json=proxyId,proto3" json:"proxy_id,omitempty"`
QueryNodeId int64 `protobuf:"varint,5,opt,name=query_node_id,json=queryNodeId,proto3" json:"query_node_id,omitempty"`
Timestamp uint64 `protobuf:"varint,6,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
ResultChannelId int64 `protobuf:"varint,7,opt,name=result_channel_id,json=resultChannelId,proto3" json:"result_channel_id,omitempty"`
Hits []*servicepb.Hits `protobuf:"bytes,8,rep,name=hits,proto3" json:"hits,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@ -1362,6 +1363,13 @@ func (m *SearchResult) XXX_DiscardUnknown() {
var xxx_messageInfo_SearchResult proto.InternalMessageInfo
func (m *SearchResult) GetMsgType() MsgType {
if m != nil {
return m.MsgType
}
return MsgType_kNone
}
func (m *SearchResult) GetStatus() *commonpb.Status {
if m != nil {
return m.Status
@ -1412,8 +1420,9 @@ func (m *SearchResult) GetHits() []*servicepb.Hits {
}
type TimeTickMsg struct {
PeerId int64 `protobuf:"varint,1,opt,name=peer_id,json=peerId,proto3" json:"peer_id,omitempty"`
Timestamp uint64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
MsgType MsgType `protobuf:"varint,1,opt,name=msg_type,json=msgType,proto3,enum=milvus.proto.internal.MsgType" json:"msg_type,omitempty"`
PeerId int64 `protobuf:"varint,2,opt,name=peer_id,json=peerId,proto3" json:"peer_id,omitempty"`
Timestamp uint64 `protobuf:"varint,3,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@ -1444,6 +1453,13 @@ func (m *TimeTickMsg) XXX_DiscardUnknown() {
var xxx_messageInfo_TimeTickMsg proto.InternalMessageInfo
func (m *TimeTickMsg) GetMsgType() MsgType {
if m != nil {
return m.MsgType
}
return MsgType_kNone
}
func (m *TimeTickMsg) GetPeerId() int64 {
if m != nil {
return m.PeerId
@ -1661,79 +1677,80 @@ func init() {
func init() { proto.RegisterFile("internal_msg.proto", fileDescriptor_7eb37f6b80b23116) }
var fileDescriptor_7eb37f6b80b23116 = []byte{
// 1181 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe4, 0x58, 0x4b, 0x6f, 0x1c, 0xc5,
0x13, 0xcf, 0xec, 0xd3, 0x5b, 0xeb, 0x5d, 0x8f, 0xdb, 0xf6, 0xdf, 0x9b, 0xfc, 0x21, 0x31, 0x13,
0x24, 0xac, 0x48, 0xd8, 0xc2, 0xe1, 0x40, 0xae, 0xc9, 0x1e, 0xb2, 0x44, 0x8e, 0xac, 0x59, 0x0b,
0x24, 0x24, 0x34, 0x9a, 0x9d, 0x29, 0x66, 0x5b, 0xf3, 0xe8, 0x71, 0x77, 0xaf, 0xcd, 0xfa, 0x0b,
0x70, 0x05, 0x71, 0xe4, 0xc6, 0x27, 0x80, 0x3b, 0x1f, 0x80, 0xd7, 0x9d, 0x2f, 0x01, 0x82, 0x48,
0xa0, 0x5c, 0x51, 0xf7, 0xcc, 0x3e, 0x66, 0xfd, 0xe0, 0x19, 0x64, 0xc9, 0xb7, 0xa9, 0x9a, 0x9e,
0xae, 0xfa, 0xfd, 0xea, 0xb1, 0x55, 0x0b, 0x84, 0x26, 0x12, 0x79, 0xe2, 0x46, 0x4e, 0x2c, 0x82,
0x9d, 0x94, 0x33, 0xc9, 0xc8, 0x46, 0x4c, 0xa3, 0xe3, 0x91, 0xc8, 0xa4, 0x9d, 0xc9, 0x81, 0x5b,
0xcb, 0x1e, 0x8b, 0x63, 0x96, 0x64, 0xea, 0x5b, 0xab, 0x02, 0xf9, 0x31, 0xf5, 0x70, 0xf6, 0x9d,
0xc5, 0xa0, 0xd1, 0xf3, 0x6d, 0x3c, 0x1a, 0xa1, 0x90, 0x64, 0x13, 0xea, 0x29, 0x22, 0x77, 0xa8,
0xdf, 0x31, 0xb6, 0x8c, 0xed, 0xb2, 0x5d, 0x53, 0x62, 0xcf, 0x27, 0xf7, 0xa1, 0xc2, 0x59, 0x84,
0x9d, 0xd2, 0x96, 0xb1, 0xdd, 0xde, 0xbb, 0xb3, 0x73, 0xae, 0xb1, 0x9d, 0x03, 0x44, 0x6e, 0xb3,
0x08, 0x6d, 0x7d, 0x98, 0xac, 0x43, 0xd5, 0x63, 0xa3, 0x44, 0x76, 0xca, 0x5b, 0xc6, 0x76, 0xcb,
0xce, 0x04, 0x2b, 0x00, 0x50, 0x06, 0x45, 0xca, 0x12, 0x81, 0xe4, 0x3e, 0xd4, 0x84, 0x74, 0xe5,
0x48, 0x68, 0x83, 0xcd, 0xbd, 0xff, 0x17, 0xaf, 0xce, 0xbd, 0xef, 0xeb, 0x23, 0x76, 0x7e, 0x94,
0xb4, 0xa1, 0x44, 0x7d, 0xed, 0x4b, 0xd9, 0x2e, 0x51, 0xff, 0x02, 0x43, 0x29, 0xc0, 0xa1, 0x60,
0xff, 0x25, 0xb4, 0x63, 0x68, 0x6a, 0x8b, 0xff, 0x04, 0xdb, 0x4b, 0xd0, 0x90, 0x34, 0x46, 0x21,
0xdd, 0x38, 0xd5, 0x3e, 0x55, 0xec, 0x99, 0xe2, 0x02, 0xbb, 0x3f, 0x18, 0xb0, 0xf9, 0x88, 0xa3,
0x2b, 0xf1, 0x11, 0x8b, 0x22, 0xf4, 0x24, 0x65, 0xc9, 0x04, 0xf7, 0x03, 0x58, 0x8a, 0x45, 0xe0,
0xc8, 0x71, 0x8a, 0xda, 0x8d, 0xf6, 0xde, 0xed, 0x0b, 0x20, 0xee, 0x8b, 0xe0, 0x70, 0x9c, 0xa2,
0x5d, 0x8f, 0xb3, 0x07, 0xb2, 0x01, 0x35, 0x8e, 0x47, 0xce, 0x94, 0xea, 0x2a, 0xc7, 0xa3, 0x9e,
0x5f, 0xf4, 0xb0, 0xbc, 0xe8, 0xe1, 0x4d, 0x58, 0x4a, 0x39, 0xfb, 0x70, 0xac, 0x3e, 0xab, 0xe8,
0xcf, 0xea, 0x5a, 0xee, 0xf9, 0xe4, 0x0d, 0xa8, 0x09, 0x6f, 0x88, 0xb1, 0xdb, 0xa9, 0x6a, 0x3e,
0x6e, 0x9e, 0xcb, 0xc7, 0xc3, 0x88, 0x0d, 0xec, 0xfc, 0xa0, 0xf5, 0xcc, 0x80, 0x8d, 0x2e, 0x67,
0xe9, 0x95, 0xc6, 0xb5, 0x0f, 0x2b, 0xde, 0xd4, 0x3f, 0x27, 0x71, 0x63, 0xcc, 0x01, 0xbe, 0x5a,
0xf4, 0x28, 0x2f, 0xbe, 0x9d, 0x19, 0x98, 0xa7, 0x6e, 0x8c, 0x76, 0xdb, 0x2b, 0xc8, 0xd6, 0x2f,
0x06, 0xac, 0x3f, 0x76, 0xc5, 0x75, 0x82, 0xfc, 0x9b, 0x01, 0x37, 0xbb, 0x28, 0x3c, 0x4e, 0x07,
0x78, 0x9d, 0x70, 0x7f, 0x6e, 0xc0, 0x46, 0x7f, 0xc8, 0x4e, 0xae, 0x32, 0x66, 0xeb, 0x67, 0x03,
0xfe, 0x97, 0x75, 0x97, 0x03, 0x97, 0x4b, 0x7a, 0x45, 0x23, 0xf3, 0x36, 0xb4, 0xd3, 0x89, 0x7b,
0xf3, 0x81, 0xb9, 0x7b, 0x7e, 0x60, 0xa6, 0x50, 0x74, 0x5c, 0x5a, 0xe9, 0xbc, 0x68, 0xfd, 0x64,
0xc0, 0xba, 0xea, 0x3a, 0xd7, 0x05, 0xef, 0x8f, 0x06, 0xac, 0x3d, 0x76, 0xc5, 0x75, 0x81, 0xfb,
0xcc, 0x80, 0xce, 0xa4, 0xdb, 0x5c, 0x17, 0xcc, 0xea, 0x47, 0x45, 0x75, 0x9a, 0xab, 0x8c, 0xf7,
0x5f, 0x6e, 0xae, 0xcf, 0x4b, 0xd0, 0xea, 0x25, 0x02, 0xb9, 0x7c, 0x71, 0x58, 0x5f, 0x3b, 0xeb,
0xb2, 0x42, 0xdc, 0x58, 0x74, 0x86, 0xdc, 0x85, 0x59, 0x40, 0x1c, 0xe9, 0x06, 0x1a, 0x7b, 0xc3,
0x5e, 0x9e, 0x2a, 0x0f, 0xdd, 0x80, 0xbc, 0x0c, 0x20, 0x30, 0x88, 0x31, 0x91, 0xca, 0x50, 0x55,
0x1b, 0x6a, 0xe4, 0x9a, 0x9e, 0xaf, 0x5e, 0x7b, 0x43, 0x37, 0x49, 0x30, 0x52, 0xaf, 0x6b, 0xd9,
0xeb, 0x5c, 0xd3, 0xf3, 0x0b, 0xcc, 0xd6, 0x8b, 0xcc, 0xde, 0x06, 0x98, 0x46, 0x40, 0x74, 0x96,
0xb6, 0xca, 0xdb, 0x15, 0x7b, 0x4e, 0xa3, 0x86, 0x63, 0xce, 0x4e, 0x1c, 0xea, 0x8b, 0x4e, 0x63,
0xab, 0xac, 0x86, 0x63, 0xce, 0x4e, 0x7a, 0xbe, 0x20, 0x6f, 0xc2, 0x92, 0x7a, 0xe1, 0xbb, 0xd2,
0xed, 0xc0, 0x56, 0xf9, 0xf2, 0xa1, 0x4d, 0xdd, 0xd1, 0x75, 0xa5, 0x6b, 0x7d, 0x54, 0x82, 0x56,
0x17, 0x23, 0x94, 0x78, 0x05, 0x98, 0x2f, 0xb2, 0x56, 0xb9, 0x8c, 0xb5, 0xea, 0x65, 0xac, 0xd5,
0xce, 0xb0, 0xf6, 0x0a, 0x2c, 0xa7, 0x9c, 0xc6, 0x2e, 0x1f, 0x3b, 0x21, 0x8e, 0x45, 0xa7, 0xae,
0xa9, 0x6b, 0xe6, 0xba, 0x27, 0x38, 0x16, 0xd6, 0x73, 0x03, 0x5a, 0x7d, 0x74, 0xb9, 0x37, 0x7c,
0x71, 0x4c, 0xcc, 0x23, 0x28, 0x17, 0x11, 0x14, 0x4a, 0xb1, 0xb2, 0x58, 0x8a, 0xf7, 0x60, 0x95,
0xa3, 0x18, 0x45, 0xd2, 0x99, 0x23, 0x28, 0xe3, 0x60, 0x25, 0x7b, 0xf1, 0x68, 0x4a, 0xd3, 0x2e,
0x54, 0x8f, 0x46, 0xc8, 0xc7, 0x3a, 0xed, 0x2e, 0xcd, 0x82, 0xec, 0x9c, 0xf5, 0x69, 0x09, 0x96,
0x27, 0xc8, 0xd5, 0x55, 0x7f, 0x6f, 0x1b, 0xfa, 0xeb, 0x90, 0x2d, 0x68, 0x69, 0x07, 0x9c, 0x84,
0xf9, 0x38, 0x8b, 0x78, 0x53, 0x2b, 0x9f, 0x32, 0x1f, 0x17, 0x69, 0xa9, 0xfe, 0x29, 0x5a, 0x6a,
0xe7, 0xd3, 0xb2, 0x03, 0x95, 0x21, 0x95, 0x59, 0xe8, 0x9b, 0x7b, 0xb7, 0xce, 0xef, 0x53, 0x8f,
0xa9, 0x14, 0xb6, 0x3e, 0x67, 0x75, 0xa1, 0x79, 0x48, 0x63, 0x3c, 0xa4, 0x5e, 0xb8, 0x2f, 0x82,
0x8b, 0x97, 0xd2, 0x4b, 0xb7, 0x40, 0xeb, 0x33, 0x03, 0xea, 0x4f, 0x70, 0xbc, 0xd7, 0xc7, 0x40,
0x33, 0xa4, 0x4b, 0x37, 0xbf, 0xa1, 0xaa, 0x2b, 0x97, 0xdc, 0x81, 0xe6, 0x5c, 0x6e, 0xe6, 0xec,
0xc1, 0x2c, 0x35, 0xff, 0xb8, 0x4b, 0x53, 0xe1, 0x1c, 0xbb, 0x51, 0x4e, 0xe0, 0x92, 0x5d, 0xa7,
0xe2, 0x1d, 0x25, 0xaa, 0x9b, 0x67, 0x4d, 0x4a, 0x74, 0xaa, 0x3a, 0xe9, 0x61, 0xda, 0xa5, 0x84,
0xf5, 0x3e, 0x40, 0xee, 0x9c, 0x82, 0x38, 0x8b, 0xa0, 0x31, 0x1f, 0xc1, 0xb7, 0xa0, 0x1e, 0xe2,
0x78, 0x4f, 0x60, 0xd0, 0x29, 0x69, 0xee, 0x2e, 0xaa, 0x82, 0xfc, 0x2a, 0x7b, 0x72, 0xdc, 0x4a,
0x60, 0xb5, 0x9f, 0x19, 0x53, 0xb9, 0x42, 0x85, 0xa4, 0x9e, 0x58, 0xe8, 0x9c, 0xc6, 0x62, 0xe7,
0xbc, 0x03, 0xcd, 0x18, 0x63, 0xc6, 0xc7, 0x8e, 0xa0, 0xa7, 0x38, 0x61, 0x23, 0x53, 0xf5, 0xe9,
0x29, 0x2a, 0xbc, 0xc9, 0x28, 0x76, 0x38, 0x3b, 0x11, 0x93, 0x84, 0x4a, 0x46, 0xb1, 0xcd, 0x4e,
0xc4, 0xbd, 0xaf, 0x4a, 0x50, 0xcf, 0x4b, 0x91, 0x34, 0xa0, 0x1a, 0x3e, 0x65, 0x09, 0x9a, 0x37,
0xc8, 0x06, 0xac, 0x86, 0x8b, 0x3b, 0xb7, 0xe9, 0x93, 0x35, 0x58, 0x09, 0x8b, 0x0b, 0xab, 0x89,
0x84, 0x40, 0x3b, 0x2c, 0x6c, 0x74, 0xe6, 0x07, 0x64, 0x13, 0xd6, 0xc2, 0xb3, 0x2b, 0x8f, 0x19,
0x90, 0x75, 0x30, 0xc3, 0xe2, 0x4e, 0x20, 0xcc, 0x21, 0xd9, 0x00, 0x33, 0x5c, 0x18, 0xc2, 0xcd,
0xaf, 0x0d, 0xb2, 0x06, 0xed, 0xb0, 0x30, 0xa9, 0x9a, 0xdf, 0x18, 0x84, 0x40, 0x2b, 0x9c, 0x1f,
0xe7, 0xcc, 0x6f, 0x0d, 0xb2, 0x09, 0x24, 0x3c, 0x33, 0xf3, 0x98, 0xdf, 0x19, 0x64, 0x1d, 0x56,
0xc2, 0xc2, 0x60, 0x20, 0xcc, 0xef, 0x0d, 0xb2, 0x0c, 0xf5, 0x30, 0xfb, 0xed, 0x34, 0x3f, 0x2e,
0x6b, 0x29, 0xeb, 0xe7, 0xe6, 0x27, 0x99, 0x94, 0x55, 0xb6, 0xf9, 0x6b, 0x99, 0xb4, 0xa1, 0x11,
0x4e, 0x52, 0xda, 0xfc, 0xa2, 0x31, 0x95, 0xfb, 0xe3, 0xc4, 0x33, 0xbf, 0x6c, 0xdc, 0x7b, 0x00,
0x4b, 0x93, 0xff, 0x4e, 0x08, 0x40, 0x6d, 0xdf, 0x15, 0x12, 0xb9, 0x79, 0x43, 0x3d, 0xdb, 0xe8,
0xfa, 0xc8, 0x4d, 0x43, 0x3d, 0xbf, 0xcb, 0xa9, 0xd2, 0x97, 0x14, 0xc5, 0x07, 0xaa, 0x94, 0xcd,
0xf2, 0xc3, 0xee, 0x7b, 0x0f, 0x03, 0x2a, 0x87, 0xa3, 0x81, 0x6a, 0x0e, 0xbb, 0xa7, 0x34, 0x8a,
0xe8, 0xa9, 0x44, 0x6f, 0xb8, 0x9b, 0x65, 0xca, 0xeb, 0x3e, 0x15, 0x92, 0xd3, 0xc1, 0x48, 0xa2,
0xbf, 0x3b, 0xc9, 0x97, 0x5d, 0x9d, 0x3e, 0x53, 0x31, 0x1d, 0x0c, 0x6a, 0x5a, 0x73, 0xff, 0xf7,
0x00, 0x00, 0x00, 0xff, 0xff, 0x3a, 0x14, 0x7f, 0x35, 0x35, 0x13, 0x00, 0x00,
// 1199 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe4, 0x58, 0x4b, 0x6f, 0x1c, 0x45,
0x10, 0xce, 0xec, 0xec, 0xc3, 0x5b, 0xeb, 0x5d, 0x8f, 0xdb, 0x36, 0xde, 0x04, 0x48, 0xcc, 0x04,
0x09, 0x2b, 0x12, 0xb6, 0x70, 0x38, 0x90, 0x6b, 0xe2, 0x43, 0x96, 0xc8, 0x51, 0x34, 0xb6, 0x40,
0x42, 0x42, 0xa3, 0xd9, 0x99, 0x62, 0xb6, 0x35, 0x8f, 0x1e, 0x77, 0xf7, 0xda, 0xac, 0x2f, 0x1c,
0xb9, 0xc2, 0x99, 0x1b, 0xbf, 0x80, 0x9f, 0xc0, 0x91, 0x97, 0xb8, 0xf2, 0x27, 0x40, 0x10, 0x89,
0x28, 0x57, 0xd4, 0x3d, 0xb3, 0x8f, 0x59, 0x3f, 0x88, 0x14, 0x82, 0x2c, 0xf9, 0xb6, 0x55, 0xd3,
0xd3, 0x55, 0xdf, 0x57, 0x5d, 0xdf, 0x74, 0x2d, 0x10, 0x9a, 0x4a, 0xe4, 0xa9, 0x17, 0xbb, 0x89,
0x08, 0xb7, 0x32, 0xce, 0x24, 0x23, 0x6b, 0x09, 0x8d, 0x8f, 0x86, 0x22, 0xb7, 0xb6, 0xc6, 0x0b,
0x6e, 0x2c, 0xfa, 0x2c, 0x49, 0x58, 0x9a, 0xbb, 0x6f, 0x2c, 0x0b, 0xe4, 0x47, 0xd4, 0xc7, 0xe9,
0x7b, 0x36, 0x83, 0x66, 0x2f, 0x70, 0xf0, 0x70, 0x88, 0x42, 0x92, 0x75, 0x68, 0x64, 0x88, 0xdc,
0xa5, 0x41, 0xd7, 0xd8, 0x30, 0x36, 0x4d, 0xa7, 0xae, 0xcc, 0x5e, 0x40, 0xee, 0x42, 0x95, 0xb3,
0x18, 0xbb, 0x95, 0x0d, 0x63, 0xb3, 0xb3, 0x73, 0x6b, 0xeb, 0xcc, 0x60, 0x5b, 0x4f, 0x10, 0xb9,
0xc3, 0x62, 0x74, 0xf4, 0x62, 0xb2, 0x0a, 0x35, 0x9f, 0x0d, 0x53, 0xd9, 0x35, 0x37, 0x8c, 0xcd,
0xb6, 0x93, 0x1b, 0x76, 0x08, 0xa0, 0x02, 0x8a, 0x8c, 0xa5, 0x02, 0xc9, 0x5d, 0xa8, 0x0b, 0xe9,
0xc9, 0xa1, 0xd0, 0x01, 0x5b, 0x3b, 0xaf, 0x97, 0xb7, 0x2e, 0xb2, 0xdf, 0xd7, 0x4b, 0x9c, 0x62,
0x29, 0xe9, 0x40, 0x85, 0x06, 0x3a, 0x17, 0xd3, 0xa9, 0xd0, 0xe0, 0x9c, 0x40, 0x19, 0xc0, 0x81,
0x60, 0xff, 0x27, 0xb4, 0x23, 0x68, 0xe9, 0x88, 0x2f, 0x83, 0xed, 0x0d, 0x68, 0x4a, 0x9a, 0xa0,
0x90, 0x5e, 0x92, 0xe9, 0x9c, 0xaa, 0xce, 0xd4, 0x71, 0x4e, 0xdc, 0xdf, 0x0c, 0x58, 0x7f, 0xc0,
0xd1, 0x93, 0xf8, 0x80, 0xc5, 0x31, 0xfa, 0x92, 0xb2, 0x74, 0x8c, 0xfb, 0x1e, 0x2c, 0x24, 0x22,
0x74, 0xe5, 0x28, 0x43, 0x9d, 0x46, 0x67, 0xe7, 0xe6, 0x39, 0x10, 0xf7, 0x44, 0x78, 0x30, 0xca,
0xd0, 0x69, 0x24, 0xf9, 0x0f, 0xb2, 0x06, 0x75, 0x8e, 0x87, 0xee, 0x84, 0xea, 0x1a, 0xc7, 0xc3,
0x5e, 0x50, 0xce, 0xd0, 0x9c, 0xcf, 0xf0, 0x3a, 0x2c, 0x64, 0x9c, 0x7d, 0x3e, 0x52, 0xaf, 0x55,
0xf5, 0x6b, 0x0d, 0x6d, 0xf7, 0x02, 0xf2, 0x1e, 0xd4, 0x85, 0x3f, 0xc0, 0xc4, 0xeb, 0xd6, 0x34,
0x1f, 0xd7, 0xcf, 0xe4, 0xe3, 0x7e, 0xcc, 0xfa, 0x4e, 0xb1, 0xd0, 0x7e, 0x6a, 0xc0, 0xda, 0x2e,
0x67, 0xd9, 0xa5, 0xc6, 0xb5, 0x07, 0x4b, 0xfe, 0x24, 0x3f, 0x37, 0xf5, 0x12, 0x2c, 0x00, 0xbe,
0x5d, 0xce, 0xa8, 0x68, 0xbe, 0xad, 0x29, 0x98, 0xc7, 0x5e, 0x82, 0x4e, 0xc7, 0x2f, 0xd9, 0xf6,
0x5f, 0x06, 0xac, 0x3e, 0xf4, 0xc4, 0x55, 0x82, 0xfc, 0xcc, 0x80, 0xeb, 0xbb, 0x28, 0x7c, 0x4e,
0xfb, 0x78, 0x95, 0x70, 0x7f, 0x6b, 0xc0, 0xda, 0xfe, 0x80, 0x1d, 0x5f, 0x66, 0xcc, 0xf6, 0x9f,
0x06, 0xbc, 0x96, 0xab, 0xcb, 0x13, 0x8f, 0x4b, 0x7a, 0x49, 0x2b, 0xf3, 0x21, 0x74, 0xb2, 0x71,
0x7a, 0xb3, 0x85, 0xb9, 0x7d, 0x76, 0x61, 0x26, 0x50, 0x74, 0x5d, 0xda, 0xd9, 0xac, 0x69, 0xff,
0x61, 0xc0, 0xaa, 0x52, 0x9d, 0xab, 0x82, 0xf7, 0x77, 0x03, 0x56, 0x1e, 0x7a, 0xe2, 0xaa, 0xc0,
0x7d, 0x6a, 0x40, 0x77, 0xac, 0x36, 0x57, 0x05, 0xb3, 0xfa, 0xa8, 0x28, 0xa5, 0xb9, 0xcc, 0x78,
0xff, 0x63, 0x71, 0x7d, 0x5e, 0x81, 0x76, 0x2f, 0x15, 0xc8, 0xe5, 0xab, 0xc3, 0xfa, 0xce, 0xe9,
0x94, 0x15, 0xe2, 0xe6, 0x7c, 0x32, 0xe4, 0x36, 0x4c, 0x0b, 0xe2, 0x4a, 0x2f, 0xd4, 0xd8, 0x9b,
0xce, 0xe2, 0xc4, 0x79, 0xe0, 0x85, 0xe4, 0x4d, 0x00, 0x81, 0x61, 0x82, 0xa9, 0x54, 0x81, 0x6a,
0x3a, 0x50, 0xb3, 0xf0, 0xf4, 0x02, 0xf5, 0xd8, 0x1f, 0x78, 0x69, 0x8a, 0xb1, 0x7a, 0x5c, 0xcf,
0x1f, 0x17, 0x9e, 0x5e, 0x50, 0x62, 0xb6, 0x51, 0x66, 0xf6, 0x26, 0xc0, 0xa4, 0x02, 0xa2, 0xbb,
0xb0, 0x61, 0x6e, 0x56, 0x9d, 0x19, 0x8f, 0xba, 0x1c, 0x73, 0x76, 0xec, 0xd2, 0x40, 0x74, 0x9b,
0x1b, 0xa6, 0xba, 0x1c, 0x73, 0x76, 0xdc, 0x0b, 0x04, 0x79, 0x1f, 0x16, 0xd4, 0x83, 0xc0, 0x93,
0x5e, 0x17, 0x36, 0xcc, 0x8b, 0x2f, 0x6d, 0x6a, 0x8f, 0x5d, 0x4f, 0x7a, 0xf6, 0x97, 0x15, 0x68,
0xef, 0x62, 0x8c, 0x12, 0x2f, 0x01, 0xf3, 0x65, 0xd6, 0xaa, 0x17, 0xb1, 0x56, 0xbb, 0x88, 0xb5,
0xfa, 0x29, 0xd6, 0xde, 0x82, 0xc5, 0x8c, 0xd3, 0xc4, 0xe3, 0x23, 0x37, 0xc2, 0x91, 0xe8, 0x36,
0x34, 0x75, 0xad, 0xc2, 0xf7, 0x08, 0x47, 0xc2, 0x7e, 0x6e, 0x40, 0x7b, 0x1f, 0x3d, 0xee, 0x0f,
0x5e, 0x1d, 0x13, 0xb3, 0x08, 0xcc, 0x32, 0x82, 0x52, 0x2b, 0x56, 0xe7, 0x5b, 0xf1, 0x0e, 0x2c,
0x73, 0x14, 0xc3, 0x58, 0xba, 0x33, 0x04, 0xe5, 0x1c, 0x2c, 0xe5, 0x0f, 0x1e, 0x4c, 0x68, 0xda,
0x86, 0xda, 0xe1, 0x10, 0xf9, 0x48, 0x1f, 0xbb, 0x0b, 0x4f, 0x41, 0xbe, 0xce, 0xfe, 0xb5, 0x02,
0x8b, 0x63, 0xe4, 0x6a, 0xab, 0x97, 0x01, 0x3e, 0x1d, 0xa4, 0x2a, 0x2f, 0x3e, 0x48, 0x4d, 0xd9,
0x32, 0xcf, 0x63, 0x6b, 0x4e, 0x7f, 0x6c, 0x68, 0xeb, 0xdc, 0xdd, 0x94, 0x05, 0x38, 0xe5, 0xa2,
0xa5, 0x9d, 0x8f, 0x59, 0x80, 0xf3, 0x8c, 0xd6, 0x5f, 0x88, 0xd1, 0xc6, 0xd9, 0x8c, 0x6e, 0x41,
0x75, 0x40, 0x65, 0xde, 0x8d, 0xad, 0x9d, 0x1b, 0x67, 0x4b, 0xdc, 0x43, 0x2a, 0x85, 0xa3, 0xd7,
0xd9, 0x5f, 0x40, 0xeb, 0x80, 0x26, 0x78, 0x40, 0xfd, 0x68, 0x4f, 0x84, 0x2f, 0x43, 0xe7, 0xcc,
0x28, 0x5c, 0x29, 0x8d, 0xc2, 0x17, 0x2a, 0xb7, 0xfd, 0x8d, 0x01, 0x8d, 0x47, 0x38, 0xda, 0xd9,
0xc7, 0x50, 0x93, 0xab, 0x05, 0xa3, 0x18, 0xa6, 0x6b, 0x5a, 0x2f, 0xc8, 0x2d, 0x68, 0xcd, 0x74,
0x44, 0xb1, 0x3b, 0x4c, 0x1b, 0xe2, 0xdf, 0xbf, 0x0d, 0x54, 0xb8, 0x47, 0x5e, 0x5c, 0xd4, 0x66,
0xc1, 0x69, 0x50, 0xf1, 0x91, 0x32, 0xd5, 0xce, 0x53, 0x69, 0x14, 0xdd, 0x9a, 0x6e, 0x35, 0x98,
0x68, 0xa3, 0xb0, 0x3f, 0x05, 0x28, 0x92, 0x53, 0xec, 0x4c, 0x8b, 0x6f, 0xcc, 0x16, 0xff, 0x03,
0x68, 0x44, 0x38, 0xda, 0x11, 0x18, 0x76, 0x2b, 0x9a, 0xf6, 0xf3, 0x38, 0x2b, 0xb6, 0x72, 0xc6,
0xcb, 0xed, 0x14, 0x96, 0xf7, 0xf3, 0x60, 0xea, 0x98, 0x51, 0x21, 0xa9, 0x2f, 0xe6, 0xf4, 0xda,
0x98, 0xd7, 0xeb, 0x5b, 0xd0, 0x4a, 0x30, 0x61, 0x7c, 0xe4, 0x0a, 0x7a, 0x82, 0x63, 0x36, 0x72,
0xd7, 0x3e, 0x3d, 0x41, 0x85, 0x37, 0x1d, 0x26, 0x2e, 0x67, 0xc7, 0x62, 0xdc, 0xb9, 0xe9, 0x30,
0x71, 0xd8, 0xb1, 0xb8, 0xf3, 0x7d, 0x05, 0x1a, 0x45, 0xe1, 0x48, 0x13, 0x6a, 0xd1, 0x63, 0x96,
0xa2, 0x75, 0x8d, 0xac, 0xc1, 0x72, 0x34, 0x3f, 0xe9, 0x5b, 0x01, 0x59, 0x81, 0xa5, 0xa8, 0x3c,
0x26, 0x5b, 0x48, 0x08, 0x74, 0xa2, 0xd2, 0x1c, 0x69, 0x7d, 0x46, 0xd6, 0x61, 0x25, 0x3a, 0x3d,
0x68, 0x59, 0x21, 0x59, 0x05, 0x2b, 0x2a, 0x4f, 0x22, 0xc2, 0x1a, 0x90, 0x35, 0xb0, 0xa2, 0xb9,
0xab, 0xbf, 0xf5, 0x83, 0x41, 0x56, 0xa0, 0x13, 0x95, 0xee, 0xc7, 0xd6, 0x8f, 0x06, 0x21, 0xd0,
0x8e, 0x66, 0x2f, 0x91, 0xd6, 0x4f, 0x06, 0x59, 0x07, 0x12, 0x9d, 0xba, 0x69, 0x59, 0x3f, 0x1b,
0x64, 0x15, 0x96, 0xa2, 0xd2, 0x75, 0x44, 0x58, 0xbf, 0x18, 0x64, 0x11, 0x1a, 0x51, 0xfe, 0xc5,
0xb6, 0xbe, 0x32, 0xb5, 0x95, 0x7f, 0x45, 0xac, 0xaf, 0x73, 0x2b, 0xd7, 0x13, 0xeb, 0x6f, 0x53,
0x07, 0x9b, 0x55, 0x17, 0xeb, 0x99, 0x49, 0x3a, 0xd0, 0x8c, 0xc6, 0x1d, 0x62, 0x7d, 0xd7, 0xbc,
0x73, 0x0f, 0x16, 0xc6, 0xff, 0xda, 0x10, 0x80, 0xfa, 0x9e, 0x27, 0x24, 0x72, 0xeb, 0x9a, 0xfa,
0xed, 0xa0, 0x17, 0x20, 0xb7, 0x0c, 0xf5, 0xfb, 0x63, 0x4e, 0x95, 0xbf, 0xa2, 0x68, 0x7e, 0xa2,
0x94, 0xc0, 0x32, 0xef, 0xef, 0x7e, 0x72, 0x3f, 0xa4, 0x72, 0x30, 0xec, 0x2b, 0x6d, 0xd9, 0x3e,
0xa1, 0x71, 0x4c, 0x4f, 0x24, 0xfa, 0x83, 0xed, 0xfc, 0xb4, 0xbc, 0x1b, 0x50, 0x21, 0x39, 0xed,
0x0f, 0x25, 0x06, 0xdb, 0xe3, 0x33, 0xb3, 0xad, 0x8f, 0xd0, 0xc4, 0xcc, 0xfa, 0xfd, 0xba, 0xf6,
0xdc, 0xfd, 0x27, 0x00, 0x00, 0xff, 0xff, 0x8c, 0x75, 0x4d, 0x29, 0xaf, 0x13, 0x00, 0x00,
}

View File

@ -10,7 +10,6 @@ import (
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
pb "github.com/zilliztech/milvus-distributed/internal/proto/message"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
type manipulationReq struct {
@ -21,15 +20,15 @@ type manipulationReq struct {
}
// TsMsg interfaces
func (req *manipulationReq) Ts() (typeutil.Timestamp, error) {
func (req *manipulationReq) Ts() (Timestamp, error) {
if req.msgs == nil {
return 0, errors.New("No typed manipulation request message in ")
}
return typeutil.Timestamp(req.msgs[0].Timestamp), nil
return req.msgs[0].Timestamp, nil
}
func (req *manipulationReq) SetTs(ts typeutil.Timestamp) {
func (req *manipulationReq) SetTs(ts Timestamp) {
for _, msg := range req.msgs {
msg.Timestamp = uint64(ts)
msg.Timestamp = ts
}
}

View File

@ -4,7 +4,6 @@ import (
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/proto/servicepb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
type insertTask struct {
@ -29,8 +28,8 @@ func (it *insertTask) Execute() error {
SegmentId: 1, // TODO: use SegIdAssigner instead
// TODO: ChannelID
ProxyId: it.ProxyId,
Timestamps: []typeutil.Timestamp{ts},
RowIds: []int64{1}, // TODO: use RowIdAllocator instead
Timestamps: []Timestamp{ts},
RowIds: []UniqueID{1}, // TODO: use RowIdAllocator instead
RowData: it.rowBatch.RowData,
}
pulsarInsertTask := msgstream.InsertTask{

View File

@ -4,17 +4,21 @@ import (
"context"
"sync"
"time"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
const timeWindow = time.Second
type Timestamp = typeutil.Timestamp
type TSOClient struct {
lastTs uint64
lastTs Timestamp
mux sync.Mutex
}
// window is 1000ms default
func (c *TSOClient) GetTimeStamp(ctx context.Context, n uint64) (ts uint64, count uint64, window time.Duration, err error) {
func (c *TSOClient) GetTimeStamp(ctx context.Context, n Timestamp) (ts Timestamp, count uint64, window time.Duration, err error) {
c.mux.Lock()
defer c.mux.Unlock()
ts = c.lastTs

View File

@ -14,6 +14,10 @@ import (
etcd "go.etcd.io/etcd/clientv3"
)
type UniqueID = typeutil.UniqueID
type Timestamp = typeutil.Timestamp
type IntPrimaryKey = typeutil.IntPrimaryKey
type BaseRequest interface {
Type() internalpb.MsgType
PreExecute() commonpb.Status
@ -35,7 +39,7 @@ type ProxyOptions struct {
resultTopic string
resultGroup string
numReaderNode int
proxyId int64 //start from 1
proxyId UniqueID //start from 1
etcdEndpoints []string
//timestamporacle
@ -45,7 +49,7 @@ type ProxyOptions struct {
//timetick
timeTickInterval uint64
timeTickTopic string
timeTickPeerId int64 //start from 1
timeTickPeerId UniqueID //start from 1
// inner member
proxyServer *proxyServer
@ -75,13 +79,13 @@ func ReadProxyOptionsFromConfig() (*ProxyOptions, error) {
resultTopic: conf.Config.Proxy.PulsarTopics.ResultTopic,
resultGroup: conf.Config.Proxy.PulsarTopics.ResultGroup,
numReaderNode: conf.Config.Proxy.NumReaderNodes,
proxyId: int64(conf.Config.Proxy.ProxyId),
proxyId: UniqueID(conf.Config.Proxy.ProxyId),
etcdEndpoints: []string{conf.Config.Etcd.Address + ":" + strconv.Itoa(int(conf.Config.Etcd.Port))},
tsoRootPath: etcdRootPath,
tsoSaveInterval: uint64(conf.Config.Proxy.TosSaveInterval),
timeTickInterval: uint64(conf.Config.Proxy.TimeTickInterval),
timeTickTopic: conf.Config.Proxy.PulsarTopics.TimeTickTopic,
timeTickPeerId: int64(conf.Config.Proxy.ProxyId),
timeTickPeerId: UniqueID(conf.Config.Proxy.ProxyId),
}, nil
}
@ -151,8 +155,8 @@ func StartProxy(opt *ProxyOptions) error {
pulsarProducer: ttProducer,
peer_id: opt.timeTickPeerId,
ctx: opt.ctx,
areRequestsDelivered: func(ts typeutil.Timestamp) bool { return srv.reqSch.AreRequestsDelivered(ts, 2) },
getTimestamp: func() (typeutil.Timestamp, error) {
areRequestsDelivered: func(ts Timestamp) bool { return srv.reqSch.AreRequestsDelivered(ts, 2) },
getTimestamp: func() (Timestamp, error) {
ts, st := tso.AllocOne()
return ts, st
},

View File

@ -230,13 +230,13 @@ func TestProxyNode(t *testing.T) {
{Blob: uint64ToBytes(14)},
{Blob: uint64ToBytes(15)},
},
EntityIdArray: []int64{10, 11, 12, 13, 14, 15},
EntityIdArray: []UniqueID{10, 11, 12, 13, 14, 15},
PartitionTag: "",
ExtraParams: nil,
}
deleteParm := pb.DeleteByIDParam{
CollectionName: "cm100",
IdArray: []int64{20, 21},
IdArray: []UniqueID{20, 21},
}
searchParm := pb.SearchParam{
@ -266,7 +266,7 @@ func TestProxyNode(t *testing.T) {
Status: &pb.Status{ErrorCode: pb.ErrorCode_SUCCESS},
Entities: &pb.Entities{
Status: &pb.Status{ErrorCode: pb.ErrorCode_SUCCESS},
Ids: []int64{11, 13, 15},
Ids: []UniqueID{11, 13, 15},
ValidRow: []bool{true, true, true},
RowsData: []*pb.RowData{
{Blob: uint64ToBytes(11)},
@ -286,7 +286,7 @@ func TestProxyNode(t *testing.T) {
Status: &pb.Status{ErrorCode: pb.ErrorCode_SUCCESS},
Entities: &pb.Entities{
Status: &pb.Status{ErrorCode: pb.ErrorCode_SUCCESS},
Ids: []int64{12, 14, 16},
Ids: []UniqueID{12, 14, 16},
ValidRow: []bool{true, false, true},
RowsData: []*pb.RowData{
{Blob: uint64ToBytes(12)},
@ -326,7 +326,7 @@ func TestProxyNode(t *testing.T) {
assert.Equal(t, insertR.EntityIdArray[i], int64(i+10))
}
var insertPrimaryKey []int64
var insertPrimaryKey []IntPrimaryKey
readerM1, ok := <-reader.Chan()
assert.True(t, ok)

View File

@ -63,7 +63,7 @@ func (s *proxyServer) restartQueryRoutine(buf_size int) error {
return err
}
resultMap := make(map[int64]*queryReq)
resultMap := make(map[UniqueID]*queryReq)
go func() {
defer result.Close()
@ -165,63 +165,6 @@ func (s *proxyServer) reduceResults(query *queryReq) *servicepb.QueryResult {
return &result
}
//var entities []*struct {
// Idx int64
// Score float32
// Hit *servicepb.Hits
//}
//var rows int
//
//result_err := func(msg string) *pb.QueryResult {
// return &pb.QueryResult{
// Status: &pb.Status{
// ErrorCode: pb.ErrorCode_UNEXPECTED_ERROR,
// Reason: msg,
// },
// }
//}
//for _, r := range results {
// for i := 0; i < len(r.Hits); i++ {
// entity := struct {
// Ids int64
// ValidRow bool
// RowsData *pb.RowData
// Scores float32
// Distances float32
// }{
// Ids: r.Entities.Ids[i],
// ValidRow: r.Entities.ValidRow[i],
// RowsData: r.Entities.RowsData[i],
// Scores: r.Scores[i],
// Distances: r.Distances[i],
// }
// entities = append(entities, &entity)
// }
//}
//sort.Slice(entities, func(i, j int) bool {
// if entities[i].ValidRow == true {
// if entities[j].ValidRow == false {
// return true
// }
// return entities[i].Scores > entities[j].Scores
// } else {
// return false
// }
//})
//rIds := make([]int64, 0, rows)
//rValidRow := make([]bool, 0, rows)
//rRowsData := make([]*pb.RowData, 0, rows)
//rScores := make([]float32, 0, rows)
//rDistances := make([]float32, 0, rows)
//for i := 0; i < rows; i++ {
// rIds = append(rIds, entities[i].Ids)
// rValidRow = append(rValidRow, entities[i].ValidRow)
// rRowsData = append(rRowsData, entities[i].RowsData)
// rScores = append(rScores, entities[i].Scores)
// rDistances = append(rDistances, entities[i].Distances)
//}
return &servicepb.QueryResult{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,

View File

@ -2,8 +2,6 @@ package proxy
import (
"sync"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
type requestScheduler struct {
@ -11,12 +9,12 @@ type requestScheduler struct {
//manipulations requestQueue
manipulationsChan chan *manipulationReq // manipulation queue
mTimestamp typeutil.Timestamp
mTimestamp Timestamp
mTimestampMux sync.Mutex
//queries requestQueue
queryChan chan *queryReq
qTimestamp typeutil.Timestamp
qTimestamp Timestamp
qTimestampMux sync.Mutex
}
@ -25,7 +23,7 @@ type requestScheduler struct {
// bit_1 = 1: select manipulation queue
// bit_2 = 1: select query queue
// example: if mode = 3, then both definition and manipulation queues are selected
func (rs *requestScheduler) AreRequestsDelivered(ts typeutil.Timestamp, selection uint32) bool {
func (rs *requestScheduler) AreRequestsDelivered(ts Timestamp, selection uint32) bool {
r1 := func() bool {
if selection&uint32(2) == 0 {
return true

View File

@ -44,7 +44,7 @@ type proxyServer struct {
resultGroup string
numReaderNode int
proxyId int64
getTimestamp func(count uint32) ([]typeutil.Timestamp, error)
getTimestamp func(count uint32) ([]Timestamp, error)
client *etcd.Client
ctx context.Context
wg sync.WaitGroup

View File

@ -19,7 +19,6 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
mpb "github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
pb "github.com/zilliztech/milvus-distributed/internal/proto/message"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"go.etcd.io/etcd/clientv3"
"google.golang.org/grpc"
)
@ -71,11 +70,11 @@ func startTestProxyServer(proxy_addr string, master_addr string, t *testing.T) *
resultGroup: "reusltG",
numReaderNode: 2,
proxyId: 1,
getTimestamp: func(count uint32) ([]typeutil.Timestamp, error) {
getTimestamp: func(count uint32) ([]Timestamp, error) {
timestamp += 100
t := make([]typeutil.Timestamp, count)
t := make([]Timestamp, count)
for i := 0; i < int(count); i++ {
t[i] = typeutil.Timestamp(timestamp)
t[i] = timestamp
}
return t, nil
},
@ -379,7 +378,7 @@ func TestProxyServer_InsertAndDelete(t *testing.T) {
assert.Equalf(t, primaryKey[i], uint64(i+1), "insert failed")
}
t.Logf("m_timestamp = %d", ps.reqSch.mTimestamp)
assert.Equalf(t, ps.reqSch.mTimestamp, typeutil.Timestamp(1300), "insert failed")
assert.Equalf(t, ps.reqSch.mTimestamp, Timestamp(1300), "insert failed")
}
func TestProxyServer_Search(t *testing.T) {

View File

@ -2,16 +2,13 @@ package proxy
import (
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
//type TimeStamp uint64
type task interface {
Id() int64 // return ReqId
Id() UniqueID // return ReqId
Type() internalpb.MsgType
GetTs() typeutil.Timestamp
SetTs(ts typeutil.Timestamp)
GetTs() Timestamp
SetTs(ts Timestamp)
PreExecute() error
Execute() error
PostExecute() error
@ -21,12 +18,12 @@ type task interface {
type baseTask struct {
ReqType internalpb.MsgType
ReqId int64
Ts typeutil.Timestamp
ProxyId int64
ReqId UniqueID
Ts Timestamp
ProxyId UniqueID
}
func (bt *baseTask) Id() int64 {
func (bt *baseTask) Id() UniqueID {
return bt.ReqId
}
@ -34,10 +31,10 @@ func (bt *baseTask) Type() internalpb.MsgType {
return bt.ReqType
}
func (bt *baseTask) GetTs() typeutil.Timestamp {
func (bt *baseTask) GetTs() Timestamp {
return bt.Ts
}
func (bt *baseTask) SetTs(ts typeutil.Timestamp) {
func (bt *baseTask) SetTs(ts Timestamp) {
bt.Ts = ts
}

View File

@ -4,13 +4,11 @@ import (
"container/list"
"log"
"sync"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
type baseTaskQueue struct {
unissuedTasks *list.List
activeTasks map[typeutil.Timestamp]*task
activeTasks map[Timestamp]*task
utLock sync.Mutex
atLock sync.Mutex
}
@ -74,7 +72,7 @@ func (queue *baseTaskQueue) AddActiveTask(t *task) {
queue.activeTasks[ts] = t
}
func (queue *baseTaskQueue) PopActiveTask(ts typeutil.Timestamp) *task {
func (queue *baseTaskQueue) PopActiveTask(ts Timestamp) *task {
queue.atLock.Lock()
defer queue.atLock.Lock()
t, ok := queue.activeTasks[ts]
@ -86,7 +84,7 @@ func (queue *baseTaskQueue) PopActiveTask(ts typeutil.Timestamp) *task {
return nil
}
func (queue *baseTaskQueue) TaskDoneTest(ts typeutil.Timestamp) bool {
func (queue *baseTaskQueue) TaskDoneTest(ts Timestamp) bool {
queue.utLock.Lock()
defer queue.utLock.Unlock()
for e := queue.unissuedTasks.Front(); e != nil; e = e.Next() {
@ -219,7 +217,7 @@ func (sched *taskScheduler) Start() error {
return nil
}
func (sched *taskScheduler) TaskDoneTest(ts typeutil.Timestamp) bool {
func (sched *taskScheduler) TaskDoneTest(ts Timestamp) bool {
ddTaskDone := sched.DdQueue.TaskDoneTest(ts)
dmTaskDone := sched.DmQueue.TaskDoneTest(ts)
dqTaskDone := sched.DqQueue.TaskDoneTest(ts)

View File

@ -9,18 +9,17 @@ import (
"github.com/golang/protobuf/proto"
"github.com/zilliztech/milvus-distributed/internal/errors"
pb "github.com/zilliztech/milvus-distributed/internal/proto/message"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
type timeTick struct {
lastTick typeutil.Timestamp
currentTick typeutil.Timestamp
lastTick Timestamp
currentTick Timestamp
interval uint64
pulsarProducer pulsar.Producer
peer_id int64
ctx context.Context
areRequestsDelivered func(ts typeutil.Timestamp) bool
getTimestamp func() (typeutil.Timestamp, error)
areRequestsDelivered func(ts Timestamp) bool
getTimestamp func() (Timestamp, error)
}
func (tt *timeTick) tick() error {

View File

@ -9,7 +9,6 @@ import (
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
pb "github.com/zilliztech/milvus-distributed/internal/proto/message"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
func TestTimeTick(t *testing.T) {
@ -29,15 +28,15 @@ func TestTimeTick(t *testing.T) {
ctx, _ := context.WithTimeout(context.Background(), 4*time.Second)
var curTs typeutil.Timestamp
var curTs Timestamp
curTs = 0
tt := timeTick{
interval: 200,
pulsarProducer: producer,
peer_id: 1,
ctx: ctx,
areRequestsDelivered: func(ts typeutil.Timestamp) bool { return true },
getTimestamp: func() (typeutil.Timestamp, error) {
areRequestsDelivered: func(ts Timestamp) bool { return true },
getTimestamp: func() (Timestamp, error) {
curTs = curTs + 100
return curTs, nil
},

View File

@ -12,11 +12,14 @@ package reader
*/
import "C"
import "github.com/zilliztech/milvus-distributed/internal/util/typeutil"
type UniqueID = typeutil.UniqueID
type Collection struct {
CollectionPtr C.CCollection
CollectionName string
CollectionID int64
CollectionID UniqueID
Partitions []*Partition
}

View File

@ -56,22 +56,22 @@ type serviceTimeMsg struct {
}
type InsertData struct {
insertIDs map[int64][]int64
insertTimestamps map[int64][]uint64
insertRecords map[int64][]*commonpb.Blob
insertOffset map[int64]int64
insertIDs map[UniqueID][]UniqueID
insertTimestamps map[UniqueID][]Timestamp
insertRecords map[UniqueID][]*commonpb.Blob
insertOffset map[UniqueID]int64
}
type DeleteData struct {
deleteIDs map[int64][]int64
deleteTimestamps map[int64][]uint64
deleteOffset map[int64]int64
deleteIDs map[UniqueID][]UniqueID
deleteTimestamps map[UniqueID][]Timestamp
deleteOffset map[UniqueID]int64
}
type DeleteRecord struct {
entityID int64
timestamp uint64
segmentID int64
entityID UniqueID
timestamp Timestamp
segmentID UniqueID
}
type DeletePreprocessData struct {

View File

@ -3,6 +3,8 @@ package reader
import (
"log"
internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
)
@ -36,7 +38,7 @@ func (msNode *msgStreamNode) Operate(in []*Msg) []*Msg {
}
for _, msg := range streamMsg.tsMessages {
switch (*msg).Type() {
case msgstream.KInsert:
case internalPb.MsgType_kInsert:
dmMsg.insertMessages = append(dmMsg.insertMessages, (*msg).(*msgstream.InsertTask))
// case msgstream.KDelete:
// dmMsg.deleteMessages = append(dmMsg.deleteMessages, (*msg).(*msgstream.DeleteTask))

View File

@ -15,9 +15,10 @@ import "C"
import (
"context"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"time"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"github.com/zilliztech/milvus-distributed/internal/kv"
"github.com/zilliztech/milvus-distributed/internal/msgclient"
msgPb "github.com/zilliztech/milvus-distributed/internal/proto/message"
@ -25,30 +26,6 @@ import (
type Timestamp = typeutil.Timestamp
//type InsertData struct {
// insertIDs map[int64][]int64
// insertTimestamps map[int64][]uint64
// insertRecords map[int64][][]byte
// insertOffset map[int64]int64
//}
//
//type DeleteData struct {
// deleteIDs map[int64][]int64
// deleteTimestamps map[int64][]uint64
// deleteOffset map[int64]int64
//}
//
//type DeleteRecord struct {
// entityID int64
// timestamp uint64
// segmentID int64
//}
//
//type DeletePreprocessData struct {
// deleteRecords []*DeleteRecord
// count int32
//}
type QueryNodeDataBuffer struct {
InsertDeleteBuffer []*msgPb.InsertOrDeleteMsg
SearchBuffer []*msgPb.SearchMsg

View File

@ -8,10 +8,10 @@ import (
msgpb "github.com/zilliztech/milvus-distributed/internal/proto/message"
)
type ResultEntityIds []int64
type ResultEntityIds []UniqueID
type SearchResult struct {
ResultIds []int64
ResultIds []UniqueID
ResultDistances []float32
}

View File

@ -65,8 +65,8 @@ func TestSearch_Search(t *testing.T) {
},
Uid: int64(i),
PartitionTag: "partition0",
Timestamp: uint64(i + 1000),
SegmentId: int64(i),
Timestamp: Timestamp(i + 1000),
SegmentId: UniqueID(i),
ChannelId: 0,
Op: msgPb.OpType_INSERT,
ClientId: 0,
@ -122,9 +122,9 @@ func TestSearch_Search(t *testing.T) {
FloatData: queryRawData,
},
PartitionTag: []string{"partition0"},
Uid: int64(0),
Timestamp: uint64(0),
ClientId: int64(0),
Uid: UniqueID(0),
Timestamp: Timestamp(0),
ClientId: UniqueID(0),
ExtraParams: nil,
Json: []string{queryJSON},
}

View File

@ -13,12 +13,16 @@ package reader
*/
import "C"
import (
"strconv"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
msgPb "github.com/zilliztech/milvus-distributed/internal/proto/message"
"strconv"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
type IntPrimaryKey = typeutil.IntPrimaryKey
const SegmentLifetime = 20000
const (
@ -30,8 +34,8 @@ const (
type Segment struct {
SegmentPtr C.CSegmentBase
SegmentID int64
SegmentCloseTime uint64
SegmentID UniqueID
SegmentCloseTime Timestamp
LastMemSize int64
SegmentStatus int
}
@ -126,7 +130,7 @@ func (s *Segment) SegmentPreDelete(numOfRecords int) int64 {
return int64(offset)
}
func (s *Segment) SegmentInsert(offset int64, entityIDs *[]int64, timestamps *[]uint64, records *[]*commonpb.Blob) error {
func (s *Segment) SegmentInsert(offset int64, entityIDs *[]UniqueID, timestamps *[]Timestamp, records *[]*commonpb.Blob) error {
/*
int
Insert(CSegmentBase c_segment,
@ -174,7 +178,7 @@ func (s *Segment) SegmentInsert(offset int64, entityIDs *[]int64, timestamps *[]
return nil
}
func (s *Segment) SegmentDelete(offset int64, entityIDs *[]int64, timestamps *[]uint64) error {
func (s *Segment) SegmentDelete(offset int64, entityIDs *[]UniqueID, timestamps *[]Timestamp) error {
/*
int
Delete(CSegmentBase c_segment,
@ -197,7 +201,7 @@ func (s *Segment) SegmentDelete(offset int64, entityIDs *[]int64, timestamps *[]
return nil
}
func (s *Segment) SegmentSearch(query *QueryInfo, timestamp uint64, vectorRecord *msgPb.VectorRowRecord) (*SearchResult, error) {
func (s *Segment) SegmentSearch(query *QueryInfo, timestamp Timestamp, vectorRecord *msgPb.VectorRowRecord) (*SearchResult, error) {
/*
int
Search(CSegmentBase c_segment,
@ -216,7 +220,7 @@ func (s *Segment) SegmentSearch(query *QueryInfo, timestamp uint64, vectorRecord
field_name: C.CString(query.FieldName),
}
resultIds := make([]int64, int64(query.TopK)*query.NumQueries)
resultIds := make([]IntPrimaryKey, int64(query.TopK)*query.NumQueries)
resultDistances := make([]float32, int64(query.TopK)*query.NumQueries)
var cTimestamp = C.ulong(timestamp)

View File

@ -12,10 +12,10 @@ import (
)
// Function `GetSegmentByEntityId` should return entityIDs, timestamps and segmentIDs
func (node *QueryNode) GetKey2Segments() (*[]int64, *[]uint64, *[]int64) {
var entityIDs = make([]int64, 0)
var timestamps = make([]uint64, 0)
var segmentIDs = make([]int64, 0)
func (node *QueryNode) GetKey2Segments() (*[]UniqueID, *[]Timestamp, *[]UniqueID) {
var entityIDs = make([]UniqueID, 0)
var timestamps = make([]Timestamp, 0)
var segmentIDs = make([]UniqueID, 0)
var key2SegMsg = node.messageClient.Key2SegMsg
for _, msg := range key2SegMsg {
@ -35,7 +35,7 @@ func (node *QueryNode) GetKey2Segments() (*[]int64, *[]uint64, *[]int64) {
return &entityIDs, &timestamps, &segmentIDs
}
func (node *QueryNode) GetCollectionByID(collectionID int64) *Collection {
func (node *QueryNode) GetCollectionByID(collectionID UniqueID) *Collection {
for _, collection := range node.Collections {
if collection.CollectionID == collectionID {
return collection
@ -55,7 +55,7 @@ func (node *QueryNode) GetCollectionByCollectionName(collectionName string) (*Co
return nil, errors.New("Cannot found collection: " + collectionName)
}
func (node *QueryNode) GetSegmentBySegmentID(segmentID int64) (*Segment, error) {
func (node *QueryNode) GetSegmentBySegmentID(segmentID UniqueID) (*Segment, error) {
targetSegment := node.SegmentsMap[segmentID]
if targetSegment == nil {
@ -65,7 +65,7 @@ func (node *QueryNode) GetSegmentBySegmentID(segmentID int64) (*Segment, error)
return targetSegment, nil
}
func (node *QueryNode) FoundSegmentBySegmentID(segmentID int64) bool {
func (node *QueryNode) FoundSegmentBySegmentID(segmentID UniqueID) bool {
_, ok := node.SegmentsMap[segmentID]
return ok

View File

@ -38,9 +38,9 @@ func TestUtilFunctions_GetKey2Segments(t *testing.T) {
for i := 0; i < msgLength; i++ {
key2SegMsg := msgPb.Key2SegMsg{
Uid: int64(i),
Timestamp: uint64(i + 1000),
SegmentId: []int64{int64(i)},
Uid: UniqueID(i),
Timestamp: Timestamp(i + 1000),
SegmentId: []UniqueID{UniqueID(i)},
}
node.messageClient.Key2SegMsg = append(node.messageClient.Key2SegMsg, &key2SegMsg)
}

View File

@ -2,11 +2,13 @@ package storagetype
import (
"context"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
type Key = []byte
type Value = []byte
type Timestamp = uint64
type Timestamp = typeutil.Timestamp
type DriverType = string
type SegmentIndex = []byte
type SegmentDL = []byte

View File

@ -11,6 +11,8 @@ import (
"sync"
"time"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/golang/protobuf/proto"
"github.com/zilliztech/milvus-distributed/internal/conf"
@ -18,6 +20,9 @@ import (
pb "github.com/zilliztech/milvus-distributed/internal/proto/message"
)
type UniqueID = typeutil.UniqueID
type Timestamp = typeutil.Timestamp
type InsertLog struct {
MsgLength int
DurationInMilliseconds int64
@ -44,7 +49,7 @@ type TimeSync interface {
}
type TimeSyncMsg struct {
Timestamp uint64
Timestamp Timestamp
NumRecorders int64
}
@ -89,7 +94,7 @@ func NewTimeSync(
timeSyncSubName string,
readTopics []string,
readSubName string,
proxyIdList []int64,
proxyIdList []UniqueID,
readStopFlagClientId int64,
opts ...TimeSyncOption,
) (TimeSync, error) {

View File

@ -47,7 +47,7 @@ const (
func TestAlignTimeSync(t *testing.T) {
r := &TimeSyncCfg{
proxyIdList: []int64{1, 2, 3},
proxyIdList: []UniqueID{1, 2, 3},
interval: 200,
}
ts := []*internalpb.TimeTickMsg{
@ -78,7 +78,7 @@ func TestAlignTimeSync(t *testing.T) {
func TestAlignTimeSync2(t *testing.T) {
r := &TimeSyncCfg{
proxyIdList: []int64{1, 2, 3},
proxyIdList: []UniqueID{1, 2, 3},
interval: 200,
}
ts := []*internalpb.TimeTickMsg{
@ -107,7 +107,7 @@ func TestAlignTimeSync2(t *testing.T) {
func TestAlignTimeSync3(t *testing.T) {
r := &TimeSyncCfg{
proxyIdList: []int64{1, 2, 3},
proxyIdList: []UniqueID{1, 2, 3},
interval: 200,
}
ts := []*internalpb.TimeTickMsg{
@ -145,7 +145,7 @@ func TestAlignTimeSync3(t *testing.T) {
func TestAlignTimeSync4(t *testing.T) {
r := &TimeSyncCfg{
proxyIdList: []int64{1},
proxyIdList: []UniqueID{1},
interval: 200,
}
ts := []*internalpb.TimeTickMsg{
@ -176,7 +176,7 @@ func TestAlignTimeSync4(t *testing.T) {
func TestAlignTimeSync5(t *testing.T) {
r := &TimeSyncCfg{
proxyIdList: []int64{1, 2, 3},
proxyIdList: []UniqueID{1, 2, 3},
interval: 200,
}
ts := []*internalpb.TimeTickMsg{
@ -213,7 +213,7 @@ func TestNewTimeSync(t *testing.T) {
timeSyncSubName,
[]string{readerTopic1, readerTopic2, readerTopic3, readerTopic4},
readerSubName,
[]int64{2, 1},
[]UniqueID{2, 1},
interval,
WithReaderQueueSize(8),
)
@ -296,7 +296,7 @@ func TestTimeSync(t *testing.T) {
timeSyncSubName,
[]string{readerTopic1, readerTopic2, readerTopic3, readerTopic4},
readerSubName,
[]int64{2, 1},
[]UniqueID{2, 1},
interval,
WithReaderQueueSize(1024),
)
@ -589,7 +589,7 @@ func startWriteTimeSync(id int64, topic string, client pulsar.Client, duration t
func startProxy(pt pulsar.Producer, ptid int64, pr1 pulsar.Producer, prid1 int64, pr2 pulsar.Producer, prid2 int64, duration time.Duration, t *testing.T) {
total := int(duration / (10 * time.Millisecond))
ticker := time.Tick(10 * time.Millisecond)
var timestamp uint64 = 0
var timestamp Timestamp = 0
for i := 1; i <= total; i++ {
<-ticker
timestamp += 10

View File

@ -1,4 +1,5 @@
package typeutil
type Timestamp = uint64
type ID = int64
type IntPrimaryKey = int64
type UniqueID = int64

View File

@ -1,227 +0,0 @@
package writer
import (
"context"
"encoding/binary"
"encoding/json"
"fmt"
"log"
"os"
"strconv"
"sync"
"time"
"github.com/zilliztech/milvus-distributed/internal/conf"
"github.com/zilliztech/milvus-distributed/internal/msgclient"
msgpb "github.com/zilliztech/milvus-distributed/internal/proto/message"
"github.com/zilliztech/milvus-distributed/internal/storage"
storagetype "github.com/zilliztech/milvus-distributed/internal/storage/type"
)
type SegmentIdInfo struct {
CollectionName string
EntityId int64
SegmentIds []string
}
type MsgCounter struct {
InsertCounter int64
InsertTime time.Time
// InsertedRecordSize float64
DeleteCounter int64
DeleteTime time.Time
}
type InsertLog struct {
MsgLength int
DurationInMilliseconds int64
InsertTime time.Time
NumSince int64
Speed float64
}
type WriteNode struct {
KvStore *storagetype.Store
MessageClient *msgclient.WriterMessageClient
TimeSync uint64
MsgCounter *MsgCounter
InsertLogs []InsertLog
}
func (wn *WriteNode) Close() {
wn.MessageClient.Close()
}
func NewWriteNode(ctx context.Context,
address string,
topics []string,
timeSync uint64) (*WriteNode, error) {
kv, err := storage.NewStore(context.Background(), storagetype.MinIODriver)
mc := msgclient.WriterMessageClient{}
msgCounter := MsgCounter{
InsertCounter: 0,
InsertTime: time.Now(),
DeleteCounter: 0,
DeleteTime: time.Now(),
// InsertedRecordSize: 0,
}
return &WriteNode{
KvStore: &kv,
MessageClient: &mc,
TimeSync: timeSync,
MsgCounter: &msgCounter,
InsertLogs: make([]InsertLog, 0),
}, err
}
func (wn *WriteNode) InsertBatchData(ctx context.Context, data []*msgpb.InsertOrDeleteMsg, wg *sync.WaitGroup) error {
var prefixKey string
var suffixKey string
var prefixKeys [][]byte
var suffixKeys []string
var binaryData [][]byte
var timeStamp []uint64
byteArr := make([]byte, 8)
intData := uint64(0)
binary.BigEndian.PutUint64(byteArr, intData)
for i := 0; i < len(data); i++ {
prefixKey = data[i].CollectionName + "-" + strconv.FormatUint(uint64(data[i].Uid), 10)
suffixKey = strconv.FormatUint(uint64(data[i].SegmentId), 10)
prefixKeys = append(prefixKeys, []byte(prefixKey))
suffixKeys = append(suffixKeys, suffixKey)
binaryData = append(binaryData, byteArr)
timeStamp = append(timeStamp, uint64(data[i].Timestamp))
}
error := (*wn.KvStore).PutRows(ctx, prefixKeys, binaryData, suffixKeys, timeStamp)
if error != nil {
fmt.Println("Can't insert data!")
wg.Done()
return error
}
wg.Done()
return nil
}
func (wn *WriteNode) DeleteBatchData(ctx context.Context, data []*msgpb.InsertOrDeleteMsg) error {
var prefixKey string
var prefixKeys [][]byte
var timeStamps []uint64
for i := 0; i < len(data); i++ {
prefixKey = data[i].CollectionName + "-" + strconv.FormatUint(uint64(data[i].Uid), 10)
prefixKeys = append(prefixKeys, []byte(prefixKey))
timeStamps = append(timeStamps, uint64(data[i].Timestamp))
segmentString, _ := (*wn.KvStore).GetSegments(ctx, []byte(prefixKey), uint64(data[i].Timestamp))
var segmentIds []int64
for _, str := range segmentString {
id, err := strconv.ParseInt(str, 10, 64)
if err != nil {
fmt.Println(str, " is not an integer.")
}
segmentIds = append(segmentIds, id)
}
segmentInfo := msgpb.Key2SegMsg{
Uid: data[i].Uid,
SegmentId: segmentIds,
Timestamp: data[i].Timestamp,
}
wn.MessageClient.Send(ctx, segmentInfo)
}
wn.MsgCounter.DeleteCounter += int64(len(timeStamps))
err := (*wn.KvStore).DeleteRows(ctx, prefixKeys, timeStamps)
if err != nil {
fmt.Println("Can't delete data")
return err
}
return nil
}
func (wn *WriteNode) UpdateTimeSync(timeSync uint64) {
wn.TimeSync = timeSync
}
func (wn *WriteNode) DoWriteNode(ctx context.Context) {
numInsertData := len(wn.MessageClient.InsertMsg)
numGoRoute := conf.Config.Writer.Parallelism
batchSize := numInsertData / numGoRoute
if numInsertData%numGoRoute != 0 {
batchSize += 1
}
start := 0
end := 0
wg := sync.WaitGroup{}
for end < numInsertData {
if end+batchSize >= numInsertData {
end = numInsertData
} else {
end = end + batchSize
}
wg.Add(1)
go wn.InsertBatchData(ctx, wn.MessageClient.InsertMsg[start:end], &wg)
start = end
}
wg.Wait()
wn.WriterLog(numInsertData)
wn.DeleteBatchData(ctx, wn.MessageClient.DeleteMsg)
wn.UpdateTimeSync(wn.MessageClient.TimeSync())
}
func (wn *WriteNode) WriterLog(length int) {
wn.MsgCounter.InsertCounter += int64(length)
timeNow := time.Now()
duration := timeNow.Sub(wn.MsgCounter.InsertTime)
speed := float64(length) / duration.Seconds()
insertLog := InsertLog{
MsgLength: length,
DurationInMilliseconds: duration.Milliseconds(),
InsertTime: timeNow,
NumSince: wn.MsgCounter.InsertCounter,
Speed: speed,
}
wn.InsertLogs = append(wn.InsertLogs, insertLog)
wn.MsgCounter.InsertTime = timeNow
}
func (wn *WriteNode) WriteWriterLog() {
f, err := os.OpenFile("/tmp/write_node_insert.txt", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
log.Fatal(err)
}
// write logs
for _, insertLog := range wn.InsertLogs {
insertLogJson, err := json.Marshal(&insertLog)
if err != nil {
log.Fatal(err)
}
writeString := string(insertLogJson) + "\n"
//fmt.Println(writeString)
_, err2 := f.WriteString(writeString)
if err2 != nil {
log.Fatal(err2)
}
}
// reset InsertLogs buffer
wn.InsertLogs = make([]InsertLog, 0)
err = f.Close()
if err != nil {
log.Fatal(err)
}
fmt.Println("write write node log done")
}

View File

@ -5,26 +5,30 @@ import (
"sync"
"testing"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
msgpb "github.com/zilliztech/milvus-distributed/internal/proto/message"
"github.com/zilliztech/milvus-distributed/internal/writer"
)
func GetInsertMsg(collectionName string, partitionTag string, entityId int64) *msgpb.InsertOrDeleteMsg {
type Timestamp = typeutil.Timestamp
func GetInsertMsg(collectionName string, partitionTag string, entityId UniqueID) *msgpb.InsertOrDeleteMsg {
return &msgpb.InsertOrDeleteMsg{
CollectionName: collectionName,
PartitionTag: partitionTag,
SegmentId: int64(entityId / 100),
Uid: int64(entityId),
Timestamp: uint64(entityId),
SegmentId: UniqueID(entityId / 100),
Uid: UniqueID(entityId),
Timestamp: Timestamp(entityId),
ClientId: 0,
}
}
func GetDeleteMsg(collectionName string, entityId int64) *msgpb.InsertOrDeleteMsg {
func GetDeleteMsg(collectionName string, entityId UniqueID) *msgpb.InsertOrDeleteMsg {
return &msgpb.InsertOrDeleteMsg{
CollectionName: collectionName,
Uid: entityId,
Timestamp: uint64(entityId + 100),
Timestamp: Timestamp(entityId + 100),
}
}
@ -38,7 +42,7 @@ func TestInsert(t *testing.T) {
writerNode, _ := writer.NewWriteNode(ctx, "null", topics, 0)
var insertMsgs []*msgpb.InsertOrDeleteMsg
for i := 0; i < 120; i++ {
insertMsgs = append(insertMsgs, GetInsertMsg("collection0", "tag01", int64(i)))
insertMsgs = append(insertMsgs, GetInsertMsg("collection0", "tag01", UniqueID(i)))
}
wg := sync.WaitGroup{}
wg.Add(3)
@ -46,7 +50,7 @@ func TestInsert(t *testing.T) {
writerNode.InsertBatchData(ctx, insertMsgs, &wg)
var insertMsgs2 []*msgpb.InsertOrDeleteMsg
for i := 120; i < 200; i++ {
insertMsgs2 = append(insertMsgs2, GetInsertMsg("collection0", "tag02", int64(i)))
insertMsgs2 = append(insertMsgs2, GetInsertMsg("collection0", "tag02", UniqueID(i)))
}
writerNode.InsertBatchData(ctx, insertMsgs2, &wg)
var deleteMsgs []*msgpb.InsertOrDeleteMsg

View File

@ -5,12 +5,16 @@ import (
"log"
"testing"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/assert"
msgpb "github.com/zilliztech/milvus-distributed/internal/proto/message"
)
type UniqueID = typeutil.UniqueID
func TestKey2Seg(t *testing.T) {
// TODO: fix test
return
@ -31,11 +35,11 @@ func TestKey2Seg(t *testing.T) {
obj := msgpb.Key2SegMsg{}
msg, err := consumer.Receive(context.Background())
proto.Unmarshal(msg.Payload(), &obj)
assert.Equal(t, obj.Uid, int64(0))
assert.Equal(t, obj.Uid, UniqueID(0))
consumer.Ack(msg)
msg, err = consumer.Receive(context.Background())
proto.Unmarshal(msg.Payload(), &obj)
assert.Equal(t, obj.Uid, int64(0))
assert.Equal(t, obj.Uid, UniqueID(0))
consumer.Ack(msg)
consumer.Close()
client.Close()