mirror of https://github.com/milvus-io/milvus.git
parent
7becfe1b99
commit
6b8c82dede
|
@ -0,0 +1,289 @@
|
|||
package readertimesync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/apache/pulsar-client-go/pulsar"
|
||||
pb "github.com/czs007/suvlim/pkg/message"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"log"
|
||||
"sort"
|
||||
)
|
||||
|
||||
const TimeSyncClientId int64 = -1
|
||||
|
||||
type ReaderTimeSync interface {
|
||||
Start() error
|
||||
Close()
|
||||
TimeSync() <-chan TimeSyncMsg
|
||||
InsertOrDelete() <-chan *pb.InsertOrDeleteMsg
|
||||
}
|
||||
|
||||
type TimeSyncMsg struct {
|
||||
Timestamp uint64
|
||||
NumRecorders int64
|
||||
}
|
||||
type ReaderTimeSyncOption func(*readerTimeSyncCfg)
|
||||
|
||||
type readerTimeSyncCfg struct {
|
||||
pulsarClient pulsar.Client
|
||||
|
||||
timeSyncConsumer pulsar.Consumer
|
||||
readerConsumer pulsar.Consumer
|
||||
readerProducer []pulsar.Producer
|
||||
|
||||
timesyncMsgChan chan TimeSyncMsg
|
||||
insertOrDeleteChan chan *pb.InsertOrDeleteMsg //output insert or delete msg
|
||||
|
||||
interval int
|
||||
proxyIdList []int64
|
||||
readerQueueSize int
|
||||
|
||||
revTimesyncFromReader map[uint64]int
|
||||
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
func toTimeStamp(ts *pb.TimeSyncMsg) int {
|
||||
// get Millisecond in second
|
||||
return int(ts.GetTimestamp()>>18) % 1000
|
||||
}
|
||||
|
||||
func NewReaderTimeSync(
|
||||
pulsarAddr string,
|
||||
timeSyncTopic string,
|
||||
timeSyncSubName string,
|
||||
readTopics []string,
|
||||
readSubName string,
|
||||
proxyIdList []int64,
|
||||
interval int,
|
||||
opts ...ReaderTimeSyncOption,
|
||||
) (ReaderTimeSync, error) {
|
||||
//check if proxyId has duplication
|
||||
if len(proxyIdList) == 0 {
|
||||
return nil, fmt.Errorf("proxy id list is empty")
|
||||
}
|
||||
if len(proxyIdList) > 1 {
|
||||
sort.Slice(proxyIdList, func(i int, j int) bool { return proxyIdList[i] < proxyIdList[j] })
|
||||
}
|
||||
for i := 1; i < len(proxyIdList); i++ {
|
||||
if proxyIdList[i] == proxyIdList[i-1] {
|
||||
return nil, fmt.Errorf("there are two proxies have the same id = %d", proxyIdList[i])
|
||||
}
|
||||
}
|
||||
r := &readerTimeSyncCfg{
|
||||
interval: interval,
|
||||
proxyIdList: proxyIdList,
|
||||
}
|
||||
for _, opt := range opts {
|
||||
opt(r)
|
||||
}
|
||||
|
||||
//check if read topic is empty
|
||||
if len(readTopics) == 0 {
|
||||
return nil, fmt.Errorf("read topic is empyt")
|
||||
}
|
||||
//set default value
|
||||
if r.readerQueueSize == 0 {
|
||||
r.readerQueueSize = 128
|
||||
}
|
||||
|
||||
r.timesyncMsgChan = make(chan TimeSyncMsg, len(readTopics)*r.readerQueueSize)
|
||||
r.insertOrDeleteChan = make(chan *pb.InsertOrDeleteMsg, len(readTopics)*r.readerQueueSize)
|
||||
r.revTimesyncFromReader = make(map[uint64]int)
|
||||
r.ctx, r.cancel = context.WithCancel(context.Background())
|
||||
|
||||
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: pulsarAddr})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("connect pulsar failed, %v", err)
|
||||
}
|
||||
r.pulsarClient = client
|
||||
|
||||
timeSyncChan := make(chan pulsar.ConsumerMessage, len(r.proxyIdList))
|
||||
if r.timeSyncConsumer, err = r.pulsarClient.Subscribe(pulsar.ConsumerOptions{
|
||||
Topic: timeSyncTopic,
|
||||
SubscriptionName: timeSyncSubName,
|
||||
Type: pulsar.KeyShared,
|
||||
SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest,
|
||||
MessageChannel: timeSyncChan,
|
||||
}); err != nil {
|
||||
return nil, fmt.Errorf("failed to subscribe topic %s, error = %v", timeSyncTopic, err)
|
||||
}
|
||||
|
||||
readerChan := make(chan pulsar.ConsumerMessage, len(readTopics)*r.readerQueueSize)
|
||||
if r.readerConsumer, err = r.pulsarClient.Subscribe(pulsar.ConsumerOptions{
|
||||
Topics: readTopics,
|
||||
SubscriptionName: readSubName,
|
||||
Type: pulsar.KeyShared,
|
||||
SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest,
|
||||
MessageChannel: readerChan,
|
||||
}); err != nil {
|
||||
return nil, fmt.Errorf("failed to subscrive reader topics : %v, error = %v", readTopics, err)
|
||||
}
|
||||
|
||||
r.readerProducer = make([]pulsar.Producer, 0, len(readTopics))
|
||||
for i := 0; i < len(readTopics); i++ {
|
||||
rp, err := r.pulsarClient.CreateProducer(pulsar.ProducerOptions{Topic: readTopics[i]})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create reader producer %s, error = %v", readTopics[i], err)
|
||||
}
|
||||
r.readerProducer = append(r.readerProducer, rp)
|
||||
}
|
||||
|
||||
return r, nil
|
||||
}
|
||||
|
||||
func (r *readerTimeSyncCfg) Close() {
|
||||
r.cancel()
|
||||
r.timeSyncConsumer.Close()
|
||||
r.readerConsumer.Close()
|
||||
for i := 0; i < len(r.readerProducer); i++ {
|
||||
r.readerProducer[i].Close()
|
||||
}
|
||||
r.pulsarClient.Close()
|
||||
}
|
||||
|
||||
func (r *readerTimeSyncCfg) Start() error {
|
||||
go r.startReadTopics()
|
||||
go r.startTimeSync()
|
||||
return r.ctx.Err()
|
||||
}
|
||||
|
||||
func (r *readerTimeSyncCfg) InsertOrDelete() <-chan *pb.InsertOrDeleteMsg {
|
||||
return r.insertOrDeleteChan
|
||||
}
|
||||
|
||||
func (r *readerTimeSyncCfg) TimeSync() <-chan TimeSyncMsg {
|
||||
return r.timesyncMsgChan
|
||||
}
|
||||
|
||||
func (r *readerTimeSyncCfg) alignTimeSync(ts []*pb.TimeSyncMsg) []*pb.TimeSyncMsg {
|
||||
if len(r.proxyIdList) > 1 {
|
||||
if len(ts) > 1 {
|
||||
for i := 1; i < len(r.proxyIdList); i++ {
|
||||
curIdx := len(ts) - 1 - i
|
||||
preIdx := len(ts) - i
|
||||
timeGap := toTimeStamp(ts[curIdx]) - toTimeStamp(ts[preIdx])
|
||||
if timeGap >= (r.interval/2) || timeGap <= (-r.interval/2) {
|
||||
ts = ts[preIdx:]
|
||||
return ts
|
||||
}
|
||||
}
|
||||
ts = ts[len(ts)-len(r.proxyIdList):]
|
||||
sort.Slice(ts, func(i int, j int) bool { return ts[i].Peer_Id < ts[j].Peer_Id })
|
||||
for i := 0; i < len(r.proxyIdList); i++ {
|
||||
if ts[i].Peer_Id != r.proxyIdList[i] {
|
||||
ts = ts[:0]
|
||||
return ts
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
ts = ts[len(ts)-1:]
|
||||
return ts
|
||||
}
|
||||
return ts
|
||||
}
|
||||
|
||||
func (r *readerTimeSyncCfg) readTimeSync(ctx context.Context, ts []*pb.TimeSyncMsg, n int) ([]*pb.TimeSyncMsg, error) {
|
||||
for i := 0; i < n; i++ {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
case cm := <-r.timeSyncConsumer.Chan():
|
||||
msg := cm.Message
|
||||
var tsm pb.TimeSyncMsg
|
||||
if err := proto.Unmarshal(msg.Payload(), &tsm); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ts = append(ts, &tsm)
|
||||
r.timeSyncConsumer.AckID(msg.ID())
|
||||
}
|
||||
}
|
||||
return ts, nil
|
||||
}
|
||||
|
||||
func (r *readerTimeSyncCfg) startTimeSync() {
|
||||
tsm := make([]*pb.TimeSyncMsg, 0, len(r.proxyIdList)*2)
|
||||
ctx, _ := context.WithCancel(r.ctx)
|
||||
var err error
|
||||
for {
|
||||
for len(tsm) != len(r.proxyIdList) {
|
||||
tsm = r.alignTimeSync(tsm)
|
||||
tsm, err = r.readTimeSync(ctx, tsm, len(r.proxyIdList)-len(tsm))
|
||||
if err != nil {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
} else {
|
||||
//TODO, log error msg
|
||||
log.Printf("read time sync error %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
ts := tsm[0].Timestamp
|
||||
for i := 1; i < len(tsm); i++ {
|
||||
if tsm[i].Timestamp < ts {
|
||||
ts = tsm[i].Timestamp
|
||||
}
|
||||
}
|
||||
tsm = tsm[:0]
|
||||
//send timestamp flag to reader channel
|
||||
msg := pb.InsertOrDeleteMsg{Timestamp: ts, ClientId: TimeSyncClientId}
|
||||
payload, err := proto.Marshal(&msg)
|
||||
if err != nil {
|
||||
//TODO log error
|
||||
log.Printf("Marshal timesync flag error %v", err)
|
||||
} else {
|
||||
for _, p := range r.readerProducer {
|
||||
if _, err := p.Send(ctx, &pulsar.ProducerMessage{Payload: payload}); err != nil {
|
||||
//TODO, log error
|
||||
log.Printf("Send timesync flag error %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *readerTimeSyncCfg) startReadTopics() {
|
||||
ctx, _ := context.WithCancel(r.ctx)
|
||||
tsm := TimeSyncMsg{Timestamp: 0, NumRecorders: 0}
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case cm := <-r.readerConsumer.Chan():
|
||||
msg := cm.Message
|
||||
var imsg pb.InsertOrDeleteMsg
|
||||
if err := proto.Unmarshal(msg.Payload(), &imsg); err != nil {
|
||||
//TODO, log error
|
||||
log.Printf("unmarshal InsertOrDeleteMsg error %v", err)
|
||||
break
|
||||
}
|
||||
if imsg.ClientId == TimeSyncClientId { //timestamp flag
|
||||
gval := r.revTimesyncFromReader[imsg.Timestamp]
|
||||
gval++
|
||||
if gval >= len(r.readerProducer) {
|
||||
if imsg.Timestamp >= tsm.Timestamp {
|
||||
tsm.Timestamp = imsg.Timestamp
|
||||
r.timesyncMsgChan <- tsm
|
||||
tsm.NumRecorders = 0
|
||||
}
|
||||
delete(r.revTimesyncFromReader, imsg.Timestamp)
|
||||
} else {
|
||||
r.revTimesyncFromReader[imsg.Timestamp] = gval
|
||||
}
|
||||
} else {
|
||||
tsm.NumRecorders++
|
||||
r.insertOrDeleteChan <- &imsg
|
||||
}
|
||||
r.readerConsumer.AckID(msg.ID())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func WithReaderQueueSize(size int) ReaderTimeSyncOption {
|
||||
return func(r *readerTimeSyncCfg) {
|
||||
r.readerQueueSize = size
|
||||
}
|
||||
}
|
|
@ -0,0 +1,337 @@
|
|||
package readertimesync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/apache/pulsar-client-go/pulsar"
|
||||
pb "github.com/czs007/suvlim/pkg/message"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"log"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
pulsarAddr = "pulsar://localhost:6650"
|
||||
timeSyncTopic = "timesync"
|
||||
timeSyncSubName = "timesync-g"
|
||||
readerTopic1 = "reader1"
|
||||
readerTopic2 = "reader2"
|
||||
readerTopic3 = "reader3"
|
||||
readerTopic4 = "reader4"
|
||||
readerSubName = "reader-g"
|
||||
interval = 200
|
||||
)
|
||||
|
||||
func TestAlignTimeSync(t *testing.T) {
|
||||
r := &readerTimeSyncCfg{
|
||||
proxyIdList: []int64{1, 2, 3},
|
||||
interval: 200,
|
||||
}
|
||||
ts := []*pb.TimeSyncMsg{
|
||||
{
|
||||
Peer_Id: 1,
|
||||
Timestamp: 5 << 18,
|
||||
},
|
||||
{
|
||||
Peer_Id: 3,
|
||||
Timestamp: 15 << 18,
|
||||
},
|
||||
{
|
||||
Peer_Id: 2,
|
||||
Timestamp: 20 << 18,
|
||||
},
|
||||
}
|
||||
r.alignTimeSync(ts)
|
||||
if len(r.proxyIdList) != 3 {
|
||||
t.Fatalf("proxyIdList should be : 1 2 3")
|
||||
}
|
||||
for i := 0; i < len(r.proxyIdList); i++ {
|
||||
if r.proxyIdList[i] != ts[i].Peer_Id {
|
||||
t.Fatalf("Align falied")
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestAlignTimeSync2(t *testing.T) {
|
||||
r := &readerTimeSyncCfg{
|
||||
proxyIdList: []int64{1, 2, 3},
|
||||
interval: 200,
|
||||
}
|
||||
ts := []*pb.TimeSyncMsg{
|
||||
{
|
||||
Peer_Id: 1,
|
||||
Timestamp: 5 << 18,
|
||||
},
|
||||
{
|
||||
Peer_Id: 3,
|
||||
Timestamp: 150 << 18,
|
||||
},
|
||||
{
|
||||
Peer_Id: 2,
|
||||
Timestamp: 20 << 18,
|
||||
},
|
||||
}
|
||||
ts = r.alignTimeSync(ts)
|
||||
if len(r.proxyIdList) != 3 {
|
||||
t.Fatalf("proxyIdList should be : 1 2 3")
|
||||
}
|
||||
if len(ts) != 1 || ts[0].Peer_Id != 2 {
|
||||
t.Fatalf("align failed")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestAlignTimeSync3(t *testing.T) {
|
||||
r := &readerTimeSyncCfg{
|
||||
proxyIdList: []int64{1, 2, 3},
|
||||
interval: 200,
|
||||
}
|
||||
ts := []*pb.TimeSyncMsg{
|
||||
{
|
||||
Peer_Id: 1,
|
||||
Timestamp: 5 << 18,
|
||||
},
|
||||
{
|
||||
Peer_Id: 1,
|
||||
Timestamp: 5 << 18,
|
||||
},
|
||||
{
|
||||
Peer_Id: 1,
|
||||
Timestamp: 5 << 18,
|
||||
},
|
||||
{
|
||||
Peer_Id: 3,
|
||||
Timestamp: 15 << 18,
|
||||
},
|
||||
{
|
||||
Peer_Id: 2,
|
||||
Timestamp: 20 << 18,
|
||||
},
|
||||
}
|
||||
ts = r.alignTimeSync(ts)
|
||||
if len(r.proxyIdList) != 3 {
|
||||
t.Fatalf("proxyIdList should be : 1 2 3")
|
||||
}
|
||||
for i := 0; i < len(r.proxyIdList); i++ {
|
||||
if r.proxyIdList[i] != ts[i].Peer_Id {
|
||||
t.Fatalf("Align falied")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewReaderTimeSync(t *testing.T) {
|
||||
r, err := NewReaderTimeSync(pulsarAddr,
|
||||
timeSyncTopic,
|
||||
timeSyncSubName,
|
||||
[]string{readerTopic1, readerTopic2, readerTopic3, readerTopic4},
|
||||
readerSubName,
|
||||
[]int64{2, 1},
|
||||
interval,
|
||||
WithReaderQueueSize(8),
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
rr := r.(*readerTimeSyncCfg)
|
||||
if rr.pulsarClient == nil {
|
||||
t.Fatalf("create pulsar client failed")
|
||||
}
|
||||
if rr.timeSyncConsumer == nil {
|
||||
t.Fatalf("create time sync consumer failed")
|
||||
}
|
||||
if rr.readerConsumer == nil {
|
||||
t.Fatalf("create reader consumer failed")
|
||||
}
|
||||
if len(rr.readerProducer) != 4 {
|
||||
t.Fatalf("create reader producer failed")
|
||||
}
|
||||
if rr.interval != interval {
|
||||
t.Fatalf("interval shoudl be %d", interval)
|
||||
}
|
||||
if rr.readerQueueSize != 8 {
|
||||
t.Fatalf("set read queue size failed")
|
||||
}
|
||||
if len(rr.proxyIdList) != 2 {
|
||||
t.Fatalf("set proxy id failed")
|
||||
}
|
||||
if rr.proxyIdList[0] != 1 || rr.proxyIdList[1] != 2 {
|
||||
t.Fatalf("set proxy id failed")
|
||||
}
|
||||
r.Close()
|
||||
}
|
||||
|
||||
func TestPulsarClient(t *testing.T) {
|
||||
t.Skip("skip pulsar client")
|
||||
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: pulsarAddr})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
go startWriteTimeSync(1, timeSyncTopic, client, 2*time.Second, t)
|
||||
go startWriteTimeSync(2, timeSyncTopic, client, 2*time.Second, t)
|
||||
timeSyncChan := make(chan pulsar.ConsumerMessage)
|
||||
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
|
||||
Topic: timeSyncTopic,
|
||||
SubscriptionName: timeSyncSubName,
|
||||
Type: pulsar.KeyShared,
|
||||
SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest,
|
||||
MessageChannel: timeSyncChan,
|
||||
})
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case cm := <-timeSyncChan:
|
||||
msg := cm.Message
|
||||
var tsm pb.TimeSyncMsg
|
||||
if err := proto.Unmarshal(msg.Payload(), &tsm); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
consumer.AckID(msg.ID())
|
||||
log.Printf("read time stamp, id = %d, time stamp = %d\n", tsm.Peer_Id, tsm.Timestamp)
|
||||
case <-ctx.Done():
|
||||
break
|
||||
}
|
||||
if ctx.Err() != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestReaderTimesync(t *testing.T) {
|
||||
r, err := NewReaderTimeSync(pulsarAddr,
|
||||
timeSyncTopic,
|
||||
timeSyncSubName,
|
||||
[]string{readerTopic1, readerTopic2, readerTopic3, readerTopic4},
|
||||
readerSubName,
|
||||
[]int64{2, 1},
|
||||
interval,
|
||||
WithReaderQueueSize(1024),
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
rr := r.(*readerTimeSyncCfg)
|
||||
pt1, err := rr.pulsarClient.CreateProducer(pulsar.ProducerOptions{Topic: timeSyncTopic})
|
||||
if err != nil {
|
||||
t.Fatalf("create time sync producer 1 error %v", err)
|
||||
}
|
||||
|
||||
pt2, err := rr.pulsarClient.CreateProducer(pulsar.ProducerOptions{Topic: timeSyncTopic})
|
||||
if err != nil {
|
||||
t.Fatalf("create time sync producer 2 error %v", err)
|
||||
}
|
||||
|
||||
pr1, err := rr.pulsarClient.CreateProducer(pulsar.ProducerOptions{Topic: readerTopic1})
|
||||
if err != nil {
|
||||
t.Fatalf("create reader 1 error %v", err)
|
||||
}
|
||||
|
||||
pr2, err := rr.pulsarClient.CreateProducer(pulsar.ProducerOptions{Topic: readerTopic2})
|
||||
if err != nil {
|
||||
t.Fatalf("create reader 2 error %v", err)
|
||||
}
|
||||
|
||||
pr3, err := rr.pulsarClient.CreateProducer(pulsar.ProducerOptions{Topic: readerTopic3})
|
||||
if err != nil {
|
||||
t.Fatalf("create reader 3 error %v", err)
|
||||
}
|
||||
|
||||
pr4, err := rr.pulsarClient.CreateProducer(pulsar.ProducerOptions{Topic: readerTopic4})
|
||||
if err != nil {
|
||||
t.Fatalf("create reader 4 error %v", err)
|
||||
}
|
||||
|
||||
go startProxy(pt1, 1, pr1, 1, pr2, 2, 2*time.Second, t)
|
||||
go startProxy(pt2, 2, pr3, 3, pr4, 4, 2*time.Second, t)
|
||||
|
||||
ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
r.Start()
|
||||
|
||||
var tsm1, tsm2 TimeSyncMsg
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
break
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
tsm1.NumRecorders = 0
|
||||
break
|
||||
case tsm1 = <-r.TimeSync():
|
||||
|
||||
}
|
||||
if tsm1.NumRecorders > 0 {
|
||||
for i := int64(0); i < tsm1.NumRecorders; i++ {
|
||||
im := <-r.InsertOrDelete()
|
||||
log.Printf("%d - %d", im.Timestamp, tsm2.Timestamp)
|
||||
if im.Timestamp < tsm2.Timestamp {
|
||||
t.Fatalf("time sync error , im.Timestamp = %d, tsm2.Timestamp = %d", im.Timestamp, tsm2.Timestamp)
|
||||
}
|
||||
}
|
||||
tsm2 = tsm1
|
||||
}
|
||||
|
||||
}
|
||||
r.Close()
|
||||
}
|
||||
|
||||
func startWriteTimeSync(id int64, topic string, client pulsar.Client, duration time.Duration, t *testing.T) {
|
||||
p, _ := client.CreateProducer(pulsar.ProducerOptions{Topic: topic})
|
||||
ticker := time.Tick(interval * time.Millisecond)
|
||||
numSteps := int(duration / (interval * time.Millisecond))
|
||||
var tm uint64 = 0
|
||||
for i := 0; i < numSteps; i++ {
|
||||
<-ticker
|
||||
tm += interval
|
||||
tsm := pb.TimeSyncMsg{Timestamp: tm << 18, Peer_Id: id}
|
||||
tb, _ := proto.Marshal(&tsm)
|
||||
if _, err := p.Send(context.Background(), &pulsar.ProducerMessage{Payload: tb}); err != nil {
|
||||
t.Fatalf("send failed tsm id=%d, timestamp=%d, err=%v", tsm.Peer_Id, tsm.Timestamp, err)
|
||||
} else {
|
||||
//log.Printf("send tsm id=%d, timestamp=%d", tsm.Peer_Id, tsm.Timestamp)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func startProxy(pt pulsar.Producer, ptid int64, pr1 pulsar.Producer, prid1 int64, pr2 pulsar.Producer, prid2 int64, duration time.Duration, t *testing.T) {
|
||||
total := int(duration / (10 * time.Millisecond))
|
||||
ticker := time.Tick(10 * time.Millisecond)
|
||||
var timestamp uint64 = 0
|
||||
for i := 1; i <= total; i++ {
|
||||
<-ticker
|
||||
timestamp += 10
|
||||
msg := pb.InsertOrDeleteMsg{ClientId: prid1, Timestamp: timestamp << 18}
|
||||
mb, err := proto.Marshal(&msg)
|
||||
if err != nil {
|
||||
t.Fatalf("marshal error %v", err)
|
||||
}
|
||||
if _, err := pr1.Send(context.Background(), &pulsar.ProducerMessage{Payload: mb}); err != nil {
|
||||
t.Fatalf("send msg error %v", err)
|
||||
}
|
||||
|
||||
msg.ClientId = prid2
|
||||
mb, err = proto.Marshal(&msg)
|
||||
if err != nil {
|
||||
t.Fatalf("marshal error %v", err)
|
||||
}
|
||||
if _, err := pr2.Send(context.Background(), &pulsar.ProducerMessage{Payload: mb}); err != nil {
|
||||
t.Fatalf("send msg error %v", err)
|
||||
}
|
||||
|
||||
log.Printf("send msg id = [ %d %d ], timestamp = %d", prid1, prid2, timestamp)
|
||||
|
||||
if i%20 == 0 {
|
||||
tm := pb.TimeSyncMsg{Peer_Id: ptid, Timestamp: timestamp << 18}
|
||||
tb, err := proto.Marshal(&tm)
|
||||
if err != nil {
|
||||
t.Fatalf("marshal error %v", err)
|
||||
}
|
||||
if _, err := pt.Send(context.Background(), &pulsar.ProducerMessage{Payload: tb}); err != nil {
|
||||
t.Fatalf("send msg error %v", err)
|
||||
}
|
||||
log.Printf("send timestamp id = %d, timestamp = %d", ptid, timestamp)
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue