mirror of https://github.com/milvus-io/milvus.git
parent
7554246ace
commit
5600d06583
|
@ -1,4 +1,4 @@
|
|||
package rmqmsgstream
|
||||
package rmqms
|
||||
|
||||
import (
|
||||
"context"
|
|
@ -0,0 +1,167 @@
|
|||
package rocksmq
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"log"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/kv"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Allocator is a Timestamp Oracle allocator.
|
||||
type Allocator interface {
|
||||
// Initialize is used to initialize a TSO allocator.
|
||||
// It will synchronize TSO with etcd and initialize the
|
||||
// memory for later allocation work.
|
||||
Initialize() error
|
||||
// UpdateTSO is used to update the TSO in memory and the time window in etcd.
|
||||
UpdateTSO() error
|
||||
// SetTSO sets the physical part with given tso. It's mainly used for BR restore
|
||||
// and can not forcibly set the TSO smaller than now.
|
||||
SetTSO(tso uint64) error
|
||||
// GenerateTSO is used to generate a given number of TSOs.
|
||||
// Make sure you have initialized the TSO allocator before calling.
|
||||
GenerateTSO(count uint32) (uint64, error)
|
||||
// Reset is used to reset the TSO allocator.
|
||||
Reset()
|
||||
}
|
||||
|
||||
// GlobalTSOAllocator is the global single point TSO allocator.
|
||||
type GlobalTSOAllocator struct {
|
||||
tso *timestampOracle
|
||||
}
|
||||
|
||||
// NewGlobalTSOAllocator creates a new global TSO allocator.
|
||||
func NewGlobalTSOAllocator(key string, kvBase kv.TxnBase) *GlobalTSOAllocator {
|
||||
var saveInterval = 3 * time.Second
|
||||
return &GlobalTSOAllocator{
|
||||
tso: ×tampOracle{
|
||||
kvBase: kvBase,
|
||||
saveInterval: saveInterval,
|
||||
maxResetTSGap: func() time.Duration { return 3 * time.Second },
|
||||
key: key,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize will initialize the created global TSO allocator.
|
||||
func (gta *GlobalTSOAllocator) Initialize() error {
|
||||
return gta.tso.InitTimestamp()
|
||||
}
|
||||
|
||||
// UpdateTSO is used to update the TSO in memory and the time window in etcd.
|
||||
func (gta *GlobalTSOAllocator) UpdateTSO() error {
|
||||
return gta.tso.UpdateTimestamp()
|
||||
}
|
||||
|
||||
// SetTSO sets the physical part with given tso.
|
||||
func (gta *GlobalTSOAllocator) SetTSO(tso uint64) error {
|
||||
return gta.tso.ResetUserTimestamp(tso)
|
||||
}
|
||||
|
||||
// GenerateTSO is used to generate a given number of TSOs.
|
||||
// Make sure you have initialized the TSO allocator before calling.
|
||||
func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (uint64, error) {
|
||||
var physical, logical int64
|
||||
if count == 0 {
|
||||
return 0, errors.New("tso count should be positive")
|
||||
}
|
||||
|
||||
maxRetryCount := 10
|
||||
|
||||
for i := 0; i < maxRetryCount; i++ {
|
||||
current := (*atomicObject)(atomic.LoadPointer(>a.tso.TSO))
|
||||
if current == nil || current.physical.Equal(typeutil.ZeroTime) {
|
||||
// If it's leader, maybe SyncTimestamp hasn't completed yet
|
||||
log.Println("sync hasn't completed yet, wait for a while")
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
|
||||
physical = current.physical.UnixNano() / int64(time.Millisecond)
|
||||
logical = atomic.AddInt64(¤t.logical, int64(count))
|
||||
if logical >= maxLogical {
|
||||
log.Println("logical part outside of max logical interval, please check ntp time",
|
||||
zap.Int("retry-count", i))
|
||||
time.Sleep(UpdateTimestampStep)
|
||||
continue
|
||||
}
|
||||
return tsoutil.ComposeTS(physical, logical), nil
|
||||
}
|
||||
return 0, errors.New("can not get timestamp")
|
||||
}
|
||||
|
||||
func (gta *GlobalTSOAllocator) Alloc(count uint32) (typeutil.Timestamp, error) {
|
||||
//return gta.tso.SyncTimestamp()
|
||||
start, err := gta.GenerateTSO(count)
|
||||
if err != nil {
|
||||
return typeutil.ZeroTimestamp, err
|
||||
}
|
||||
//ret := make([]typeutil.Timestamp, count)
|
||||
//for i:=uint32(0); i < count; i++{
|
||||
// ret[i] = start + uint64(i)
|
||||
//}
|
||||
return start, err
|
||||
}
|
||||
|
||||
func (gta *GlobalTSOAllocator) AllocOne() (typeutil.Timestamp, error) {
|
||||
return gta.GenerateTSO(1)
|
||||
}
|
||||
|
||||
// Reset is used to reset the TSO allocator.
|
||||
func (gta *GlobalTSOAllocator) Reset() {
|
||||
gta.tso.ResetTimestamp()
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////
|
||||
|
||||
type IDAllocator interface {
|
||||
Alloc(count uint32) (UniqueID, UniqueID, error)
|
||||
AllocOne() (UniqueID, error)
|
||||
UpdateID() error
|
||||
}
|
||||
|
||||
// GlobalTSOAllocator is the global single point TSO allocator.
|
||||
type GlobalIDAllocator struct {
|
||||
allocator Allocator
|
||||
}
|
||||
|
||||
func NewGlobalIDAllocator(key string, base kv.TxnBase) *GlobalIDAllocator {
|
||||
return &GlobalIDAllocator{
|
||||
allocator: NewGlobalTSOAllocator(key, base),
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize will initialize the created global TSO allocator.
|
||||
func (gia *GlobalIDAllocator) Initialize() error {
|
||||
return gia.allocator.Initialize()
|
||||
}
|
||||
|
||||
// GenerateTSO is used to generate a given number of TSOs.
|
||||
// Make sure you have initialized the TSO allocator before calling.
|
||||
func (gia *GlobalIDAllocator) Alloc(count uint32) (UniqueID, UniqueID, error) {
|
||||
timestamp, err := gia.allocator.GenerateTSO(count)
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
idStart := UniqueID(timestamp)
|
||||
idEnd := idStart + int64(count)
|
||||
return idStart, idEnd, nil
|
||||
}
|
||||
|
||||
func (gia *GlobalIDAllocator) AllocOne() (UniqueID, error) {
|
||||
timestamp, err := gia.allocator.GenerateTSO(1)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
idStart := UniqueID(timestamp)
|
||||
return idStart, nil
|
||||
}
|
||||
|
||||
func (gia *GlobalIDAllocator) UpdateID() error {
|
||||
return gia.allocator.UpdateTSO()
|
||||
}
|
|
@ -0,0 +1,13 @@
|
|||
package rocksmq
|
||||
|
||||
var rmq *RocksMQ
|
||||
|
||||
func InitRmq(rocksdbName string, idAllocator IDAllocator) error {
|
||||
var err error
|
||||
rmq, err = NewRocksMQ(rocksdbName, idAllocator)
|
||||
return err
|
||||
}
|
||||
|
||||
func GetRmq() *RocksMQ {
|
||||
return rmq
|
||||
}
|
|
@ -7,7 +7,6 @@ import (
|
|||
"github.com/tecbot/gorocksdb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/errors"
|
||||
"github.com/zilliztech/milvus-distributed/internal/kv"
|
||||
"github.com/zilliztech/milvus-distributed/internal/master"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
|
||||
|
||||
memkv "github.com/zilliztech/milvus-distributed/internal/kv/mem"
|
||||
|
@ -73,7 +72,7 @@ type RocksMQ struct {
|
|||
kv kv.Base
|
||||
channels map[string]*Channel
|
||||
cgCtxs map[string]ConsumerGroupContext
|
||||
idAllocator master.IDAllocator
|
||||
idAllocator IDAllocator
|
||||
produceMu sync.Mutex
|
||||
consumeMu sync.Mutex
|
||||
//ctx context.Context
|
||||
|
@ -85,7 +84,7 @@ type RocksMQ struct {
|
|||
//tsoTicker *time.Ticker
|
||||
}
|
||||
|
||||
func NewRocksMQ(name string, idAllocator master.IDAllocator) (*RocksMQ, error) {
|
||||
func NewRocksMQ(name string, idAllocator IDAllocator) (*RocksMQ, error) {
|
||||
bbto := gorocksdb.NewDefaultBlockBasedTableOptions()
|
||||
bbto.SetBlockCache(gorocksdb.NewLRUCache(RocksDBLRUCacheCapacity))
|
||||
opts := gorocksdb.NewDefaultOptions()
|
||||
|
|
|
@ -8,7 +8,6 @@ import (
|
|||
|
||||
"github.com/stretchr/testify/assert"
|
||||
etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
|
||||
master "github.com/zilliztech/milvus-distributed/internal/master"
|
||||
"go.etcd.io/etcd/clientv3"
|
||||
)
|
||||
|
||||
|
@ -20,14 +19,15 @@ func TestFixChannelName(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestRocksMQ(t *testing.T) {
|
||||
master.Init()
|
||||
|
||||
etcdAddr := master.Params.EtcdAddress
|
||||
etcdAddr := os.Getenv("ETCD_ADDRESS")
|
||||
if etcdAddr == "" {
|
||||
etcdAddr = "localhost:2379"
|
||||
}
|
||||
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
|
||||
assert.Nil(t, err)
|
||||
etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/root")
|
||||
defer etcdKV.Close()
|
||||
idAllocator := master.NewGlobalIDAllocator("dummy", etcdKV)
|
||||
idAllocator := NewGlobalIDAllocator("dummy", etcdKV)
|
||||
_ = idAllocator.Initialize()
|
||||
|
||||
name := "/tmp/rocksmq"
|
||||
|
@ -76,14 +76,15 @@ func TestRocksMQ(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestRocksMQ_Loop(t *testing.T) {
|
||||
master.Init()
|
||||
|
||||
etcdAddr := master.Params.EtcdAddress
|
||||
etcdAddr := os.Getenv("ETCD_ADDRESS")
|
||||
if etcdAddr == "" {
|
||||
etcdAddr = "localhost:2379"
|
||||
}
|
||||
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
|
||||
assert.Nil(t, err)
|
||||
etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/root")
|
||||
defer etcdKV.Close()
|
||||
idAllocator := master.NewGlobalIDAllocator("dummy", etcdKV)
|
||||
idAllocator := NewGlobalIDAllocator("dummy", etcdKV)
|
||||
_ = idAllocator.Initialize()
|
||||
|
||||
name := "/tmp/rocksmq_1"
|
||||
|
@ -143,14 +144,15 @@ func TestRocksMQ_Loop(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestRocksMQ_Goroutines(t *testing.T) {
|
||||
master.Init()
|
||||
|
||||
etcdAddr := master.Params.EtcdAddress
|
||||
etcdAddr := os.Getenv("ETCD_ADDRESS")
|
||||
if etcdAddr == "" {
|
||||
etcdAddr = "localhost:2379"
|
||||
}
|
||||
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}})
|
||||
assert.Nil(t, err)
|
||||
etcdKV := etcdkv.NewEtcdKV(cli, "/etcd/test/root")
|
||||
defer etcdKV.Close()
|
||||
idAllocator := master.NewGlobalIDAllocator("dummy", etcdKV)
|
||||
idAllocator := NewGlobalIDAllocator("dummy", etcdKV)
|
||||
_ = idAllocator.Initialize()
|
||||
|
||||
name := "/tmp/rocksmq_2"
|
||||
|
|
|
@ -0,0 +1,202 @@
|
|||
// Copyright 2016 TiKV Project Authors.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package rocksmq
|
||||
|
||||
import (
|
||||
"log"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/errors"
|
||||
"github.com/zilliztech/milvus-distributed/internal/kv"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
|
||||
)
|
||||
|
||||
const (
|
||||
// UpdateTimestampStep is used to update timestamp.
|
||||
UpdateTimestampStep = 50 * time.Millisecond
|
||||
// updateTimestampGuard is the min timestamp interval.
|
||||
updateTimestampGuard = time.Millisecond
|
||||
// maxLogical is the max upper limit for logical time.
|
||||
// When a TSO's logical time reaches this limit,
|
||||
// the physical time will be forced to increase.
|
||||
maxLogical = int64(1 << 18)
|
||||
)
|
||||
|
||||
// atomicObject is used to store the current TSO in memory.
|
||||
type atomicObject struct {
|
||||
physical time.Time
|
||||
logical int64
|
||||
}
|
||||
|
||||
// timestampOracle is used to maintain the logic of tso.
|
||||
type timestampOracle struct {
|
||||
key string
|
||||
kvBase kv.TxnBase
|
||||
|
||||
// TODO: remove saveInterval
|
||||
saveInterval time.Duration
|
||||
maxResetTSGap func() time.Duration
|
||||
// For tso, set after the PD becomes a leader.
|
||||
TSO unsafe.Pointer
|
||||
lastSavedTime atomic.Value
|
||||
}
|
||||
|
||||
func (t *timestampOracle) loadTimestamp() (time.Time, error) {
|
||||
strData, err := t.kvBase.Load(t.key)
|
||||
|
||||
var binData []byte = []byte(strData)
|
||||
|
||||
if err != nil {
|
||||
return typeutil.ZeroTime, err
|
||||
}
|
||||
if len(binData) == 0 {
|
||||
return typeutil.ZeroTime, nil
|
||||
}
|
||||
return typeutil.ParseTimestamp(binData)
|
||||
}
|
||||
|
||||
// save timestamp, if lastTs is 0, we think the timestamp doesn't exist, so create it,
|
||||
// otherwise, update it.
|
||||
func (t *timestampOracle) saveTimestamp(ts time.Time) error {
|
||||
data := typeutil.Uint64ToBytes(uint64(ts.UnixNano()))
|
||||
err := t.kvBase.Save(t.key, string(data))
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
t.lastSavedTime.Store(ts)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *timestampOracle) InitTimestamp() error {
|
||||
|
||||
//last, err := t.loadTimestamp()
|
||||
//if err != nil {
|
||||
// return err
|
||||
//}
|
||||
|
||||
next := time.Now()
|
||||
|
||||
// If the current system time minus the saved etcd timestamp is less than `updateTimestampGuard`,
|
||||
// the timestamp allocation will start from the saved etcd timestamp temporarily.
|
||||
//if typeutil.SubTimeByWallClock(next, last) < updateTimestampGuard {
|
||||
// next = last.Add(updateTimestampGuard)
|
||||
//}
|
||||
|
||||
save := next.Add(t.saveInterval)
|
||||
if err := t.saveTimestamp(save); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
//log.Print("sync and save timestamp", zap.Time("last", last), zap.Time("save", save), zap.Time("next", next))
|
||||
|
||||
current := &atomicObject{
|
||||
physical: next,
|
||||
}
|
||||
atomic.StorePointer(&t.TSO, unsafe.Pointer(current))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ResetUserTimestamp update the physical part with specified tso.
|
||||
func (t *timestampOracle) ResetUserTimestamp(tso uint64) error {
|
||||
physical, _ := tsoutil.ParseTS(tso)
|
||||
next := physical.Add(time.Millisecond)
|
||||
prev := (*atomicObject)(atomic.LoadPointer(&t.TSO))
|
||||
|
||||
// do not update
|
||||
if typeutil.SubTimeByWallClock(next, prev.physical) <= 3*updateTimestampGuard {
|
||||
return errors.New("the specified ts too small than now")
|
||||
}
|
||||
|
||||
if typeutil.SubTimeByWallClock(next, prev.physical) >= t.maxResetTSGap() {
|
||||
return errors.New("the specified ts too large than now")
|
||||
}
|
||||
|
||||
save := next.Add(t.saveInterval)
|
||||
if err := t.saveTimestamp(save); err != nil {
|
||||
return err
|
||||
}
|
||||
update := &atomicObject{
|
||||
physical: next,
|
||||
}
|
||||
atomic.CompareAndSwapPointer(&t.TSO, unsafe.Pointer(prev), unsafe.Pointer(update))
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateTimestamp is used to update the timestamp.
|
||||
// This function will do two things:
|
||||
// 1. When the logical time is going to be used up, increase the current physical time.
|
||||
// 2. When the time window is not big enough, which means the saved etcd time minus the next physical time
|
||||
// will be less than or equal to `updateTimestampGuard`, then the time window needs to be updated and
|
||||
// we also need to save the next physical time plus `TsoSaveInterval` into etcd.
|
||||
//
|
||||
// Here is some constraints that this function must satisfy:
|
||||
// 1. The saved time is monotonically increasing.
|
||||
// 2. The physical time is monotonically increasing.
|
||||
// 3. The physical time is always less than the saved timestamp.
|
||||
func (t *timestampOracle) UpdateTimestamp() error {
|
||||
prev := (*atomicObject)(atomic.LoadPointer(&t.TSO))
|
||||
now := time.Now()
|
||||
|
||||
jetLag := typeutil.SubTimeByWallClock(now, prev.physical)
|
||||
if jetLag > 3*UpdateTimestampStep {
|
||||
log.Print("clock offset", zap.Duration("jet-lag", jetLag), zap.Time("prev-physical", prev.physical), zap.Time("now", now))
|
||||
}
|
||||
|
||||
var next time.Time
|
||||
prevLogical := atomic.LoadInt64(&prev.logical)
|
||||
// If the system time is greater, it will be synchronized with the system time.
|
||||
if jetLag > updateTimestampGuard {
|
||||
next = now
|
||||
} else if prevLogical > maxLogical/2 {
|
||||
// The reason choosing maxLogical/2 here is that it's big enough for common cases.
|
||||
// Because there is enough timestamp can be allocated before next update.
|
||||
log.Print("the logical time may be not enough", zap.Int64("prev-logical", prevLogical))
|
||||
next = prev.physical.Add(time.Millisecond)
|
||||
} else {
|
||||
// It will still use the previous physical time to alloc the timestamp.
|
||||
return nil
|
||||
}
|
||||
|
||||
// It is not safe to increase the physical time to `next`.
|
||||
// The time window needs to be updated and saved to etcd.
|
||||
if typeutil.SubTimeByWallClock(t.lastSavedTime.Load().(time.Time), next) <= updateTimestampGuard {
|
||||
save := next.Add(t.saveInterval)
|
||||
if err := t.saveTimestamp(save); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
current := &atomicObject{
|
||||
physical: next,
|
||||
logical: 0,
|
||||
}
|
||||
|
||||
atomic.StorePointer(&t.TSO, unsafe.Pointer(current))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ResetTimestamp is used to reset the timestamp.
|
||||
func (t *timestampOracle) ResetTimestamp() {
|
||||
zero := &atomicObject{
|
||||
physical: time.Now(),
|
||||
}
|
||||
atomic.StorePointer(&t.TSO, unsafe.Pointer(zero))
|
||||
}
|
Loading…
Reference in New Issue