mirror of https://github.com/milvus-io/milvus.git
Add tsLoop for master
Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>pull/4973/head^2
parent
ce969e9568
commit
e94138816f
|
@ -364,7 +364,7 @@ func (s *Master) AllocTimestamp(ctx context.Context, request *internalpb.TsoRequ
|
|||
}
|
||||
|
||||
response := &internalpb.TsoResponse{
|
||||
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR},
|
||||
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS},
|
||||
Timestamp: ts,
|
||||
Count: count,
|
||||
}
|
||||
|
@ -383,7 +383,7 @@ func (s *Master) AllocID(ctx context.Context, request *internalpb.IDRequest) (*i
|
|||
}
|
||||
|
||||
response := &internalpb.IDResponse{
|
||||
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR},
|
||||
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS},
|
||||
ID: ts,
|
||||
Count: count,
|
||||
}
|
||||
|
|
|
@ -57,6 +57,10 @@ func (gia *GlobalIDAllocator) AllocOne() (UniqueID, error) {
|
|||
return idStart, nil
|
||||
}
|
||||
|
||||
func (gia *GlobalIDAllocator) UpdateID() error {
|
||||
return gia.allocator.UpdateTSO()
|
||||
}
|
||||
|
||||
func AllocOne() (UniqueID, error) {
|
||||
return allocator.AllocOne()
|
||||
}
|
||||
|
@ -64,3 +68,7 @@ func AllocOne() (UniqueID, error) {
|
|||
func Alloc(count uint32) (UniqueID, UniqueID, error) {
|
||||
return allocator.Alloc(count)
|
||||
}
|
||||
|
||||
func UpdateID() error {
|
||||
return allocator.UpdateID()
|
||||
}
|
||||
|
|
|
@ -54,6 +54,10 @@ type Master struct {
|
|||
kvBase *kv.EtcdKV
|
||||
scheduler *ddRequestScheduler
|
||||
mt *metaTable
|
||||
|
||||
// tso ticker
|
||||
tsTicker *time.Ticker
|
||||
|
||||
// Add callback functions at different stages
|
||||
startCallbacks []func()
|
||||
closeCallbacks []func()
|
||||
|
@ -187,6 +191,10 @@ func (s *Master) startServerLoop(ctx context.Context, grpcPort int64) error {
|
|||
|
||||
s.serverLoopWg.Add(1)
|
||||
go s.segmentStatisticsLoop()
|
||||
|
||||
s.serverLoopWg.Add(1)
|
||||
go s.tsLoop()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -233,6 +241,31 @@ func (s *Master) grpcLoop(grpcPort int64) {
|
|||
|
||||
}
|
||||
|
||||
func (s *Master) tsLoop() {
|
||||
defer s.serverLoopWg.Done()
|
||||
s.tsTicker = time.NewTicker(tso.UpdateTimestampStep)
|
||||
defer s.tsTicker.Stop()
|
||||
ctx, cancel := context.WithCancel(s.serverLoopCtx)
|
||||
defer cancel()
|
||||
for {
|
||||
select {
|
||||
case <-s.tsTicker.C:
|
||||
if err := tso.UpdateTSO(); err != nil {
|
||||
log.Println("failed to update timestamp", err)
|
||||
return
|
||||
}
|
||||
if err := id.UpdateID(); err != nil {
|
||||
log.Println("failed to update id", err)
|
||||
return
|
||||
}
|
||||
case <-ctx.Done():
|
||||
// Server is closed and it should return nil.
|
||||
log.Println("tsLoop is closed")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// todo use messagestream
|
||||
func (s *Master) pulsarLoop() {
|
||||
defer s.serverLoopWg.Done()
|
||||
|
|
|
@ -136,3 +136,7 @@ func AllocOne() (typeutil.Timestamp, error) {
|
|||
func Alloc(count uint32) (typeutil.Timestamp, error) {
|
||||
return allocator.Alloc(count)
|
||||
}
|
||||
|
||||
func UpdateTSO() error {
|
||||
return allocator.UpdateTSO()
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue