From 6b8c82dede3a3b43b6297a21a2cdec73a7c14cdf Mon Sep 17 00:00:00 2001 From: neza2017 Date: Tue, 8 Sep 2020 20:50:53 +0800 Subject: [PATCH] Add timesync Signed-off-by: neza2017 --- timesync/readertimesync.go | 289 +++++++++++++++++++++++++++ timesync/readertimesync_test.go | 337 ++++++++++++++++++++++++++++++++ 2 files changed, 626 insertions(+) create mode 100644 timesync/readertimesync.go create mode 100644 timesync/readertimesync_test.go diff --git a/timesync/readertimesync.go b/timesync/readertimesync.go new file mode 100644 index 0000000000..95750fb405 --- /dev/null +++ b/timesync/readertimesync.go @@ -0,0 +1,289 @@ +package readertimesync + +import ( + "context" + "fmt" + "github.com/apache/pulsar-client-go/pulsar" + pb "github.com/czs007/suvlim/pkg/message" + "github.com/golang/protobuf/proto" + "log" + "sort" +) + +const TimeSyncClientId int64 = -1 + +type ReaderTimeSync interface { + Start() error + Close() + TimeSync() <-chan TimeSyncMsg + InsertOrDelete() <-chan *pb.InsertOrDeleteMsg +} + +type TimeSyncMsg struct { + Timestamp uint64 + NumRecorders int64 +} +type ReaderTimeSyncOption func(*readerTimeSyncCfg) + +type readerTimeSyncCfg struct { + pulsarClient pulsar.Client + + timeSyncConsumer pulsar.Consumer + readerConsumer pulsar.Consumer + readerProducer []pulsar.Producer + + timesyncMsgChan chan TimeSyncMsg + insertOrDeleteChan chan *pb.InsertOrDeleteMsg //output insert or delete msg + + interval int + proxyIdList []int64 + readerQueueSize int + + revTimesyncFromReader map[uint64]int + + ctx context.Context + cancel context.CancelFunc +} + +func toTimeStamp(ts *pb.TimeSyncMsg) int { + // get Millisecond in second + return int(ts.GetTimestamp()>>18) % 1000 +} + +func NewReaderTimeSync( + pulsarAddr string, + timeSyncTopic string, + timeSyncSubName string, + readTopics []string, + readSubName string, + proxyIdList []int64, + interval int, + opts ...ReaderTimeSyncOption, +) (ReaderTimeSync, error) { + //check if proxyId has duplication + if len(proxyIdList) == 0 { + return nil, fmt.Errorf("proxy id list is empty") + } + if len(proxyIdList) > 1 { + sort.Slice(proxyIdList, func(i int, j int) bool { return proxyIdList[i] < proxyIdList[j] }) + } + for i := 1; i < len(proxyIdList); i++ { + if proxyIdList[i] == proxyIdList[i-1] { + return nil, fmt.Errorf("there are two proxies have the same id = %d", proxyIdList[i]) + } + } + r := &readerTimeSyncCfg{ + interval: interval, + proxyIdList: proxyIdList, + } + for _, opt := range opts { + opt(r) + } + + //check if read topic is empty + if len(readTopics) == 0 { + return nil, fmt.Errorf("read topic is empyt") + } + //set default value + if r.readerQueueSize == 0 { + r.readerQueueSize = 128 + } + + r.timesyncMsgChan = make(chan TimeSyncMsg, len(readTopics)*r.readerQueueSize) + r.insertOrDeleteChan = make(chan *pb.InsertOrDeleteMsg, len(readTopics)*r.readerQueueSize) + r.revTimesyncFromReader = make(map[uint64]int) + r.ctx, r.cancel = context.WithCancel(context.Background()) + + client, err := pulsar.NewClient(pulsar.ClientOptions{URL: pulsarAddr}) + if err != nil { + return nil, fmt.Errorf("connect pulsar failed, %v", err) + } + r.pulsarClient = client + + timeSyncChan := make(chan pulsar.ConsumerMessage, len(r.proxyIdList)) + if r.timeSyncConsumer, err = r.pulsarClient.Subscribe(pulsar.ConsumerOptions{ + Topic: timeSyncTopic, + SubscriptionName: timeSyncSubName, + Type: pulsar.KeyShared, + SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest, + MessageChannel: timeSyncChan, + }); err != nil { + return nil, fmt.Errorf("failed to subscribe topic %s, error = %v", timeSyncTopic, err) + } + + readerChan := make(chan pulsar.ConsumerMessage, len(readTopics)*r.readerQueueSize) + if r.readerConsumer, err = r.pulsarClient.Subscribe(pulsar.ConsumerOptions{ + Topics: readTopics, + SubscriptionName: readSubName, + Type: pulsar.KeyShared, + SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest, + MessageChannel: readerChan, + }); err != nil { + return nil, fmt.Errorf("failed to subscrive reader topics : %v, error = %v", readTopics, err) + } + + r.readerProducer = make([]pulsar.Producer, 0, len(readTopics)) + for i := 0; i < len(readTopics); i++ { + rp, err := r.pulsarClient.CreateProducer(pulsar.ProducerOptions{Topic: readTopics[i]}) + if err != nil { + return nil, fmt.Errorf("failed to create reader producer %s, error = %v", readTopics[i], err) + } + r.readerProducer = append(r.readerProducer, rp) + } + + return r, nil +} + +func (r *readerTimeSyncCfg) Close() { + r.cancel() + r.timeSyncConsumer.Close() + r.readerConsumer.Close() + for i := 0; i < len(r.readerProducer); i++ { + r.readerProducer[i].Close() + } + r.pulsarClient.Close() +} + +func (r *readerTimeSyncCfg) Start() error { + go r.startReadTopics() + go r.startTimeSync() + return r.ctx.Err() +} + +func (r *readerTimeSyncCfg) InsertOrDelete() <-chan *pb.InsertOrDeleteMsg { + return r.insertOrDeleteChan +} + +func (r *readerTimeSyncCfg) TimeSync() <-chan TimeSyncMsg { + return r.timesyncMsgChan +} + +func (r *readerTimeSyncCfg) alignTimeSync(ts []*pb.TimeSyncMsg) []*pb.TimeSyncMsg { + if len(r.proxyIdList) > 1 { + if len(ts) > 1 { + for i := 1; i < len(r.proxyIdList); i++ { + curIdx := len(ts) - 1 - i + preIdx := len(ts) - i + timeGap := toTimeStamp(ts[curIdx]) - toTimeStamp(ts[preIdx]) + if timeGap >= (r.interval/2) || timeGap <= (-r.interval/2) { + ts = ts[preIdx:] + return ts + } + } + ts = ts[len(ts)-len(r.proxyIdList):] + sort.Slice(ts, func(i int, j int) bool { return ts[i].Peer_Id < ts[j].Peer_Id }) + for i := 0; i < len(r.proxyIdList); i++ { + if ts[i].Peer_Id != r.proxyIdList[i] { + ts = ts[:0] + return ts + } + } + } + } else { + ts = ts[len(ts)-1:] + return ts + } + return ts +} + +func (r *readerTimeSyncCfg) readTimeSync(ctx context.Context, ts []*pb.TimeSyncMsg, n int) ([]*pb.TimeSyncMsg, error) { + for i := 0; i < n; i++ { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case cm := <-r.timeSyncConsumer.Chan(): + msg := cm.Message + var tsm pb.TimeSyncMsg + if err := proto.Unmarshal(msg.Payload(), &tsm); err != nil { + return nil, err + } + ts = append(ts, &tsm) + r.timeSyncConsumer.AckID(msg.ID()) + } + } + return ts, nil +} + +func (r *readerTimeSyncCfg) startTimeSync() { + tsm := make([]*pb.TimeSyncMsg, 0, len(r.proxyIdList)*2) + ctx, _ := context.WithCancel(r.ctx) + var err error + for { + for len(tsm) != len(r.proxyIdList) { + tsm = r.alignTimeSync(tsm) + tsm, err = r.readTimeSync(ctx, tsm, len(r.proxyIdList)-len(tsm)) + if err != nil { + if ctx.Err() != nil { + return + } else { + //TODO, log error msg + log.Printf("read time sync error %v", err) + } + } + } + ts := tsm[0].Timestamp + for i := 1; i < len(tsm); i++ { + if tsm[i].Timestamp < ts { + ts = tsm[i].Timestamp + } + } + tsm = tsm[:0] + //send timestamp flag to reader channel + msg := pb.InsertOrDeleteMsg{Timestamp: ts, ClientId: TimeSyncClientId} + payload, err := proto.Marshal(&msg) + if err != nil { + //TODO log error + log.Printf("Marshal timesync flag error %v", err) + } else { + for _, p := range r.readerProducer { + if _, err := p.Send(ctx, &pulsar.ProducerMessage{Payload: payload}); err != nil { + //TODO, log error + log.Printf("Send timesync flag error %v", err) + } + } + } + } +} + +func (r *readerTimeSyncCfg) startReadTopics() { + ctx, _ := context.WithCancel(r.ctx) + tsm := TimeSyncMsg{Timestamp: 0, NumRecorders: 0} + for { + select { + case <-ctx.Done(): + return + case cm := <-r.readerConsumer.Chan(): + msg := cm.Message + var imsg pb.InsertOrDeleteMsg + if err := proto.Unmarshal(msg.Payload(), &imsg); err != nil { + //TODO, log error + log.Printf("unmarshal InsertOrDeleteMsg error %v", err) + break + } + if imsg.ClientId == TimeSyncClientId { //timestamp flag + gval := r.revTimesyncFromReader[imsg.Timestamp] + gval++ + if gval >= len(r.readerProducer) { + if imsg.Timestamp >= tsm.Timestamp { + tsm.Timestamp = imsg.Timestamp + r.timesyncMsgChan <- tsm + tsm.NumRecorders = 0 + } + delete(r.revTimesyncFromReader, imsg.Timestamp) + } else { + r.revTimesyncFromReader[imsg.Timestamp] = gval + } + } else { + tsm.NumRecorders++ + r.insertOrDeleteChan <- &imsg + } + r.readerConsumer.AckID(msg.ID()) + } + } +} + +func WithReaderQueueSize(size int) ReaderTimeSyncOption { + return func(r *readerTimeSyncCfg) { + r.readerQueueSize = size + } +} diff --git a/timesync/readertimesync_test.go b/timesync/readertimesync_test.go new file mode 100644 index 0000000000..7b089904c1 --- /dev/null +++ b/timesync/readertimesync_test.go @@ -0,0 +1,337 @@ +package readertimesync + +import ( + "context" + "github.com/apache/pulsar-client-go/pulsar" + pb "github.com/czs007/suvlim/pkg/message" + "github.com/golang/protobuf/proto" + "log" + "testing" + "time" +) + +const ( + pulsarAddr = "pulsar://localhost:6650" + timeSyncTopic = "timesync" + timeSyncSubName = "timesync-g" + readerTopic1 = "reader1" + readerTopic2 = "reader2" + readerTopic3 = "reader3" + readerTopic4 = "reader4" + readerSubName = "reader-g" + interval = 200 +) + +func TestAlignTimeSync(t *testing.T) { + r := &readerTimeSyncCfg{ + proxyIdList: []int64{1, 2, 3}, + interval: 200, + } + ts := []*pb.TimeSyncMsg{ + { + Peer_Id: 1, + Timestamp: 5 << 18, + }, + { + Peer_Id: 3, + Timestamp: 15 << 18, + }, + { + Peer_Id: 2, + Timestamp: 20 << 18, + }, + } + r.alignTimeSync(ts) + if len(r.proxyIdList) != 3 { + t.Fatalf("proxyIdList should be : 1 2 3") + } + for i := 0; i < len(r.proxyIdList); i++ { + if r.proxyIdList[i] != ts[i].Peer_Id { + t.Fatalf("Align falied") + } + } + +} + +func TestAlignTimeSync2(t *testing.T) { + r := &readerTimeSyncCfg{ + proxyIdList: []int64{1, 2, 3}, + interval: 200, + } + ts := []*pb.TimeSyncMsg{ + { + Peer_Id: 1, + Timestamp: 5 << 18, + }, + { + Peer_Id: 3, + Timestamp: 150 << 18, + }, + { + Peer_Id: 2, + Timestamp: 20 << 18, + }, + } + ts = r.alignTimeSync(ts) + if len(r.proxyIdList) != 3 { + t.Fatalf("proxyIdList should be : 1 2 3") + } + if len(ts) != 1 || ts[0].Peer_Id != 2 { + t.Fatalf("align failed") + } + +} + +func TestAlignTimeSync3(t *testing.T) { + r := &readerTimeSyncCfg{ + proxyIdList: []int64{1, 2, 3}, + interval: 200, + } + ts := []*pb.TimeSyncMsg{ + { + Peer_Id: 1, + Timestamp: 5 << 18, + }, + { + Peer_Id: 1, + Timestamp: 5 << 18, + }, + { + Peer_Id: 1, + Timestamp: 5 << 18, + }, + { + Peer_Id: 3, + Timestamp: 15 << 18, + }, + { + Peer_Id: 2, + Timestamp: 20 << 18, + }, + } + ts = r.alignTimeSync(ts) + if len(r.proxyIdList) != 3 { + t.Fatalf("proxyIdList should be : 1 2 3") + } + for i := 0; i < len(r.proxyIdList); i++ { + if r.proxyIdList[i] != ts[i].Peer_Id { + t.Fatalf("Align falied") + } + } +} + +func TestNewReaderTimeSync(t *testing.T) { + r, err := NewReaderTimeSync(pulsarAddr, + timeSyncTopic, + timeSyncSubName, + []string{readerTopic1, readerTopic2, readerTopic3, readerTopic4}, + readerSubName, + []int64{2, 1}, + interval, + WithReaderQueueSize(8), + ) + if err != nil { + t.Fatal(err) + } + rr := r.(*readerTimeSyncCfg) + if rr.pulsarClient == nil { + t.Fatalf("create pulsar client failed") + } + if rr.timeSyncConsumer == nil { + t.Fatalf("create time sync consumer failed") + } + if rr.readerConsumer == nil { + t.Fatalf("create reader consumer failed") + } + if len(rr.readerProducer) != 4 { + t.Fatalf("create reader producer failed") + } + if rr.interval != interval { + t.Fatalf("interval shoudl be %d", interval) + } + if rr.readerQueueSize != 8 { + t.Fatalf("set read queue size failed") + } + if len(rr.proxyIdList) != 2 { + t.Fatalf("set proxy id failed") + } + if rr.proxyIdList[0] != 1 || rr.proxyIdList[1] != 2 { + t.Fatalf("set proxy id failed") + } + r.Close() +} + +func TestPulsarClient(t *testing.T) { + t.Skip("skip pulsar client") + client, err := pulsar.NewClient(pulsar.ClientOptions{URL: pulsarAddr}) + if err != nil { + t.Fatal(err) + } + ctx, _ := context.WithTimeout(context.Background(), 3*time.Second) + go startWriteTimeSync(1, timeSyncTopic, client, 2*time.Second, t) + go startWriteTimeSync(2, timeSyncTopic, client, 2*time.Second, t) + timeSyncChan := make(chan pulsar.ConsumerMessage) + consumer, err := client.Subscribe(pulsar.ConsumerOptions{ + Topic: timeSyncTopic, + SubscriptionName: timeSyncSubName, + Type: pulsar.KeyShared, + SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest, + MessageChannel: timeSyncChan, + }) + if err != nil { + log.Fatal(err) + } + for { + select { + case cm := <-timeSyncChan: + msg := cm.Message + var tsm pb.TimeSyncMsg + if err := proto.Unmarshal(msg.Payload(), &tsm); err != nil { + log.Fatal(err) + } + consumer.AckID(msg.ID()) + log.Printf("read time stamp, id = %d, time stamp = %d\n", tsm.Peer_Id, tsm.Timestamp) + case <-ctx.Done(): + break + } + if ctx.Err() != nil { + break + } + } +} + +func TestReaderTimesync(t *testing.T) { + r, err := NewReaderTimeSync(pulsarAddr, + timeSyncTopic, + timeSyncSubName, + []string{readerTopic1, readerTopic2, readerTopic3, readerTopic4}, + readerSubName, + []int64{2, 1}, + interval, + WithReaderQueueSize(1024), + ) + if err != nil { + t.Fatal(err) + } + rr := r.(*readerTimeSyncCfg) + pt1, err := rr.pulsarClient.CreateProducer(pulsar.ProducerOptions{Topic: timeSyncTopic}) + if err != nil { + t.Fatalf("create time sync producer 1 error %v", err) + } + + pt2, err := rr.pulsarClient.CreateProducer(pulsar.ProducerOptions{Topic: timeSyncTopic}) + if err != nil { + t.Fatalf("create time sync producer 2 error %v", err) + } + + pr1, err := rr.pulsarClient.CreateProducer(pulsar.ProducerOptions{Topic: readerTopic1}) + if err != nil { + t.Fatalf("create reader 1 error %v", err) + } + + pr2, err := rr.pulsarClient.CreateProducer(pulsar.ProducerOptions{Topic: readerTopic2}) + if err != nil { + t.Fatalf("create reader 2 error %v", err) + } + + pr3, err := rr.pulsarClient.CreateProducer(pulsar.ProducerOptions{Topic: readerTopic3}) + if err != nil { + t.Fatalf("create reader 3 error %v", err) + } + + pr4, err := rr.pulsarClient.CreateProducer(pulsar.ProducerOptions{Topic: readerTopic4}) + if err != nil { + t.Fatalf("create reader 4 error %v", err) + } + + go startProxy(pt1, 1, pr1, 1, pr2, 2, 2*time.Second, t) + go startProxy(pt2, 2, pr3, 3, pr4, 4, 2*time.Second, t) + + ctx, _ := context.WithTimeout(context.Background(), 3*time.Second) + r.Start() + + var tsm1, tsm2 TimeSyncMsg + for { + if ctx.Err() != nil { + break + } + select { + case <-ctx.Done(): + tsm1.NumRecorders = 0 + break + case tsm1 = <-r.TimeSync(): + + } + if tsm1.NumRecorders > 0 { + for i := int64(0); i < tsm1.NumRecorders; i++ { + im := <-r.InsertOrDelete() + log.Printf("%d - %d", im.Timestamp, tsm2.Timestamp) + if im.Timestamp < tsm2.Timestamp { + t.Fatalf("time sync error , im.Timestamp = %d, tsm2.Timestamp = %d", im.Timestamp, tsm2.Timestamp) + } + } + tsm2 = tsm1 + } + + } + r.Close() +} + +func startWriteTimeSync(id int64, topic string, client pulsar.Client, duration time.Duration, t *testing.T) { + p, _ := client.CreateProducer(pulsar.ProducerOptions{Topic: topic}) + ticker := time.Tick(interval * time.Millisecond) + numSteps := int(duration / (interval * time.Millisecond)) + var tm uint64 = 0 + for i := 0; i < numSteps; i++ { + <-ticker + tm += interval + tsm := pb.TimeSyncMsg{Timestamp: tm << 18, Peer_Id: id} + tb, _ := proto.Marshal(&tsm) + if _, err := p.Send(context.Background(), &pulsar.ProducerMessage{Payload: tb}); err != nil { + t.Fatalf("send failed tsm id=%d, timestamp=%d, err=%v", tsm.Peer_Id, tsm.Timestamp, err) + } else { + //log.Printf("send tsm id=%d, timestamp=%d", tsm.Peer_Id, tsm.Timestamp) + } + } +} + +func startProxy(pt pulsar.Producer, ptid int64, pr1 pulsar.Producer, prid1 int64, pr2 pulsar.Producer, prid2 int64, duration time.Duration, t *testing.T) { + total := int(duration / (10 * time.Millisecond)) + ticker := time.Tick(10 * time.Millisecond) + var timestamp uint64 = 0 + for i := 1; i <= total; i++ { + <-ticker + timestamp += 10 + msg := pb.InsertOrDeleteMsg{ClientId: prid1, Timestamp: timestamp << 18} + mb, err := proto.Marshal(&msg) + if err != nil { + t.Fatalf("marshal error %v", err) + } + if _, err := pr1.Send(context.Background(), &pulsar.ProducerMessage{Payload: mb}); err != nil { + t.Fatalf("send msg error %v", err) + } + + msg.ClientId = prid2 + mb, err = proto.Marshal(&msg) + if err != nil { + t.Fatalf("marshal error %v", err) + } + if _, err := pr2.Send(context.Background(), &pulsar.ProducerMessage{Payload: mb}); err != nil { + t.Fatalf("send msg error %v", err) + } + + log.Printf("send msg id = [ %d %d ], timestamp = %d", prid1, prid2, timestamp) + + if i%20 == 0 { + tm := pb.TimeSyncMsg{Peer_Id: ptid, Timestamp: timestamp << 18} + tb, err := proto.Marshal(&tm) + if err != nil { + t.Fatalf("marshal error %v", err) + } + if _, err := pt.Send(context.Background(), &pulsar.ProducerMessage{Payload: tb}); err != nil { + t.Fatalf("send msg error %v", err) + } + log.Printf("send timestamp id = %d, timestamp = %d", ptid, timestamp) + } + } +}