mirror of https://github.com/milvus-io/milvus.git
Add global tsoAllocator
Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>pull/4973/head^2
parent
b4bd9cf9d6
commit
c442d50c08
|
@ -2,6 +2,7 @@ package master
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"github.com/zilliztech/milvus-distributed/internal/master/tso"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/zilliztech/milvus-distributed/internal/errors"
|
"github.com/zilliztech/milvus-distributed/internal/errors"
|
||||||
|
@ -340,7 +341,7 @@ func (s *Master) ShowPartitions(ctx context.Context, in *internalpb.ShowPartitio
|
||||||
|
|
||||||
func (s *Master) AllocTimestamp(ctx context.Context, request *internalpb.TsoRequest) (*internalpb.TsoResponse, error) {
|
func (s *Master) AllocTimestamp(ctx context.Context, request *internalpb.TsoRequest) (*internalpb.TsoResponse, error) {
|
||||||
count := request.GetCount()
|
count := request.GetCount()
|
||||||
ts, err := s.tsoAllocator.GenerateTSO(count)
|
ts, err := tso.Alloc(count)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &internalpb.TsoResponse{
|
return &internalpb.TsoResponse{
|
||||||
|
|
|
@ -43,9 +43,6 @@ type Master struct {
|
||||||
//grpc server
|
//grpc server
|
||||||
grpcServer *grpc.Server
|
grpcServer *grpc.Server
|
||||||
|
|
||||||
// for tso.
|
|
||||||
tsoAllocator tso.Allocator
|
|
||||||
|
|
||||||
// pulsar client
|
// pulsar client
|
||||||
pc *informer.PulsarClient
|
pc *informer.PulsarClient
|
||||||
|
|
||||||
|
@ -87,13 +84,13 @@ func newKVBase() *kv.EtcdKV {
|
||||||
func CreateServer(ctx context.Context) (*Master, error) {
|
func CreateServer(ctx context.Context) (*Master, error) {
|
||||||
rand.Seed(time.Now().UnixNano())
|
rand.Seed(time.Now().UnixNano())
|
||||||
id.InitGlobalIdAllocator("idTimestamp", newTSOKVBase("gid"))
|
id.InitGlobalIdAllocator("idTimestamp", newTSOKVBase("gid"))
|
||||||
|
tso.InitGlobalTsoAllocator("timestamp", newTSOKVBase("tso"))
|
||||||
m := &Master{
|
m := &Master{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
startTimestamp: time.Now().Unix(),
|
startTimestamp: time.Now().Unix(),
|
||||||
kvBase: newKVBase(),
|
kvBase: newKVBase(),
|
||||||
ssChan: make(chan internalpb.SegmentStatistics, 10),
|
ssChan: make(chan internalpb.SegmentStatistics, 10),
|
||||||
pc: informer.NewPulsarClient(),
|
pc: informer.NewPulsarClient(),
|
||||||
tsoAllocator: tso.NewGlobalTSOAllocator("timestamp", newTSOKVBase("tso")),
|
|
||||||
}
|
}
|
||||||
m.grpcServer = grpc.NewServer()
|
m.grpcServer = grpc.NewServer()
|
||||||
masterpb.RegisterMasterServer(m.grpcServer, m)
|
masterpb.RegisterMasterServer(m.grpcServer, m)
|
||||||
|
|
|
@ -36,8 +36,15 @@ type GlobalTSOAllocator struct {
|
||||||
tso *timestampOracle
|
tso *timestampOracle
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var allocator *GlobalTSOAllocator
|
||||||
|
|
||||||
|
func InitGlobalTsoAllocator(key string, base kv.KVBase){
|
||||||
|
allocator = NewGlobalTSOAllocator(key, base)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
// NewGlobalTSOAllocator creates a new global TSO allocator.
|
// NewGlobalTSOAllocator creates a new global TSO allocator.
|
||||||
func NewGlobalTSOAllocator(key string, kvBase kv.KVBase) Allocator {
|
func NewGlobalTSOAllocator(key string, kvBase kv.KVBase) *GlobalTSOAllocator {
|
||||||
|
|
||||||
var saveInterval time.Duration = 3 * time.Second
|
var saveInterval time.Duration = 3 * time.Second
|
||||||
return &GlobalTSOAllocator{
|
return &GlobalTSOAllocator{
|
||||||
|
@ -97,7 +104,34 @@ func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (uint64, error) {
|
||||||
return 0, errors.New("can not get timestamp")
|
return 0, errors.New("can not get timestamp")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (gta *GlobalTSOAllocator) Alloc(count uint32)(typeutil.Timestamp, error) {
|
||||||
|
//return gta.tso.SyncTimestamp()
|
||||||
|
start, err := gta.GenerateTSO(count)
|
||||||
|
if err != nil {
|
||||||
|
return typeutil.ZeroTimestamp, err
|
||||||
|
}
|
||||||
|
//ret := make([]typeutil.Timestamp, count)
|
||||||
|
//for i:=uint32(0); i < count; i++{
|
||||||
|
// ret[i] = start + uint64(i)
|
||||||
|
//}
|
||||||
|
return start, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (gta *GlobalTSOAllocator) AllocOne()(typeutil.Timestamp, error) {
|
||||||
|
return gta.GenerateTSO(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
// Reset is used to reset the TSO allocator.
|
// Reset is used to reset the TSO allocator.
|
||||||
func (gta *GlobalTSOAllocator) Reset() {
|
func (gta *GlobalTSOAllocator) Reset() {
|
||||||
gta.tso.ResetTimestamp()
|
gta.tso.ResetTimestamp()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func AllocOne()(typeutil.Timestamp, error) {
|
||||||
|
return allocator.AllocOne()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset is used to reset the TSO allocator.
|
||||||
|
func Alloc(count uint32)(typeutil.Timestamp, error) {
|
||||||
|
return allocator.Alloc(count)
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue