diff --git a/internal/master/grpc_service.go b/internal/master/grpc_service.go index caf16239e9..01dabfa128 100644 --- a/internal/master/grpc_service.go +++ b/internal/master/grpc_service.go @@ -2,6 +2,7 @@ package master import ( "context" + "github.com/zilliztech/milvus-distributed/internal/master/tso" "time" "github.com/zilliztech/milvus-distributed/internal/errors" @@ -340,7 +341,7 @@ func (s *Master) ShowPartitions(ctx context.Context, in *internalpb.ShowPartitio func (s *Master) AllocTimestamp(ctx context.Context, request *internalpb.TsoRequest) (*internalpb.TsoResponse, error) { count := request.GetCount() - ts, err := s.tsoAllocator.GenerateTSO(count) + ts, err := tso.Alloc(count) if err != nil { return &internalpb.TsoResponse{ diff --git a/internal/master/master.go b/internal/master/master.go index 506dd1b8f7..e45d0a0bf1 100644 --- a/internal/master/master.go +++ b/internal/master/master.go @@ -43,9 +43,6 @@ type Master struct { //grpc server grpcServer *grpc.Server - // for tso. - tsoAllocator tso.Allocator - // pulsar client pc *informer.PulsarClient @@ -87,13 +84,13 @@ func newKVBase() *kv.EtcdKV { func CreateServer(ctx context.Context) (*Master, error) { rand.Seed(time.Now().UnixNano()) id.InitGlobalIdAllocator("idTimestamp", newTSOKVBase("gid")) + tso.InitGlobalTsoAllocator("timestamp", newTSOKVBase("tso")) m := &Master{ ctx: ctx, startTimestamp: time.Now().Unix(), kvBase: newKVBase(), ssChan: make(chan internalpb.SegmentStatistics, 10), pc: informer.NewPulsarClient(), - tsoAllocator: tso.NewGlobalTSOAllocator("timestamp", newTSOKVBase("tso")), } m.grpcServer = grpc.NewServer() masterpb.RegisterMasterServer(m.grpcServer, m) diff --git a/internal/master/tso/global_allocator.go b/internal/master/tso/global_allocator.go index d553911675..a8ea4f2c30 100644 --- a/internal/master/tso/global_allocator.go +++ b/internal/master/tso/global_allocator.go @@ -36,8 +36,15 @@ type GlobalTSOAllocator struct { tso *timestampOracle } +var allocator *GlobalTSOAllocator + +func InitGlobalTsoAllocator(key string, base kv.KVBase){ + allocator = NewGlobalTSOAllocator(key, base) +} + + // NewGlobalTSOAllocator creates a new global TSO allocator. -func NewGlobalTSOAllocator(key string, kvBase kv.KVBase) Allocator { +func NewGlobalTSOAllocator(key string, kvBase kv.KVBase) *GlobalTSOAllocator { var saveInterval time.Duration = 3 * time.Second return &GlobalTSOAllocator{ @@ -97,7 +104,34 @@ func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (uint64, error) { return 0, errors.New("can not get timestamp") } +func (gta *GlobalTSOAllocator) Alloc(count uint32)(typeutil.Timestamp, error) { + //return gta.tso.SyncTimestamp() + start, err := gta.GenerateTSO(count) + if err != nil { + return typeutil.ZeroTimestamp, err + } + //ret := make([]typeutil.Timestamp, count) + //for i:=uint32(0); i < count; i++{ + // ret[i] = start + uint64(i) + //} + return start, err +} + +func (gta *GlobalTSOAllocator) AllocOne()(typeutil.Timestamp, error) { + return gta.GenerateTSO(1) +} + + // Reset is used to reset the TSO allocator. func (gta *GlobalTSOAllocator) Reset() { gta.tso.ResetTimestamp() } + +func AllocOne()(typeutil.Timestamp, error) { + return allocator.AllocOne() +} + +// Reset is used to reset the TSO allocator. +func Alloc(count uint32)(typeutil.Timestamp, error) { + return allocator.Alloc(count) +}