From e94138816f542b8d17578c60ff4ae1fb5d337ddd Mon Sep 17 00:00:00 2001 From: "zhenshan.cao" Date: Sat, 14 Nov 2020 15:26:14 +0800 Subject: [PATCH] Add tsLoop for master Signed-off-by: zhenshan.cao --- internal/master/grpc_service.go | 4 +-- internal/master/id/id.go | 8 ++++++ internal/master/master.go | 33 +++++++++++++++++++++++++ internal/master/tso/global_allocator.go | 4 +++ 4 files changed, 47 insertions(+), 2 deletions(-) diff --git a/internal/master/grpc_service.go b/internal/master/grpc_service.go index 8e1d217b66..63231be4cf 100644 --- a/internal/master/grpc_service.go +++ b/internal/master/grpc_service.go @@ -364,7 +364,7 @@ func (s *Master) AllocTimestamp(ctx context.Context, request *internalpb.TsoRequ } response := &internalpb.TsoResponse{ - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR}, + Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS}, Timestamp: ts, Count: count, } @@ -383,7 +383,7 @@ func (s *Master) AllocID(ctx context.Context, request *internalpb.IDRequest) (*i } response := &internalpb.IDResponse{ - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR}, + Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS}, ID: ts, Count: count, } diff --git a/internal/master/id/id.go b/internal/master/id/id.go index d8dbcd5abd..931e4b38fc 100644 --- a/internal/master/id/id.go +++ b/internal/master/id/id.go @@ -57,6 +57,10 @@ func (gia *GlobalIDAllocator) AllocOne() (UniqueID, error) { return idStart, nil } +func (gia *GlobalIDAllocator) UpdateID() error { + return gia.allocator.UpdateTSO() +} + func AllocOne() (UniqueID, error) { return allocator.AllocOne() } @@ -64,3 +68,7 @@ func AllocOne() (UniqueID, error) { func Alloc(count uint32) (UniqueID, UniqueID, error) { return allocator.Alloc(count) } + +func UpdateID() error { + return allocator.UpdateID() +} diff --git a/internal/master/master.go b/internal/master/master.go index db27119c12..db13a78206 100644 --- a/internal/master/master.go +++ b/internal/master/master.go @@ -54,6 +54,10 @@ type Master struct { kvBase *kv.EtcdKV scheduler *ddRequestScheduler mt *metaTable + + // tso ticker + tsTicker *time.Ticker + // Add callback functions at different stages startCallbacks []func() closeCallbacks []func() @@ -187,6 +191,10 @@ func (s *Master) startServerLoop(ctx context.Context, grpcPort int64) error { s.serverLoopWg.Add(1) go s.segmentStatisticsLoop() + + s.serverLoopWg.Add(1) + go s.tsLoop() + return nil } @@ -233,6 +241,31 @@ func (s *Master) grpcLoop(grpcPort int64) { } +func (s *Master) tsLoop() { + defer s.serverLoopWg.Done() + s.tsTicker = time.NewTicker(tso.UpdateTimestampStep) + defer s.tsTicker.Stop() + ctx, cancel := context.WithCancel(s.serverLoopCtx) + defer cancel() + for { + select { + case <-s.tsTicker.C: + if err := tso.UpdateTSO(); err != nil { + log.Println("failed to update timestamp", err) + return + } + if err := id.UpdateID(); err != nil { + log.Println("failed to update id", err) + return + } + case <-ctx.Done(): + // Server is closed and it should return nil. + log.Println("tsLoop is closed") + return + } + } +} + // todo use messagestream func (s *Master) pulsarLoop() { defer s.serverLoopWg.Done() diff --git a/internal/master/tso/global_allocator.go b/internal/master/tso/global_allocator.go index 0307107ef6..31aba37d78 100644 --- a/internal/master/tso/global_allocator.go +++ b/internal/master/tso/global_allocator.go @@ -136,3 +136,7 @@ func AllocOne() (typeutil.Timestamp, error) { func Alloc(count uint32) (typeutil.Timestamp, error) { return allocator.Alloc(count) } + +func UpdateTSO() error { + return allocator.UpdateTSO() +}