Add tso unittest and EnableMaxLogic

Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
pull/4973/head^2
zhenshan.cao 2021-02-26 13:05:52 +08:00 committed by yefu.chen
parent fe5dd84f92
commit 007e7ea858
5 changed files with 136 additions and 4 deletions

View File

@ -18,8 +18,10 @@ type GlobalIDAllocator struct {
}
func NewGlobalIDAllocator(key string, base kv.TxnBase) *GlobalIDAllocator {
allocator := tso.NewGlobalTSOAllocator(key, base)
allocator.EnableMaxLogic(false)
return &GlobalIDAllocator{
allocator: tso.NewGlobalTSOAllocator(key, base),
allocator: allocator,
}
}

View File

@ -0,0 +1,43 @@
package allocator
import (
"os"
"testing"
"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
"github.com/stretchr/testify/assert"
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
)
var gTestIDAllocator *GlobalIDAllocator
func TestGlobalTSOAllocator_All(t *testing.T) {
etcdAddress := os.Getenv("ETCD_ADDRESS")
if etcdAddress == "" {
ip := funcutil.GetLocalIP()
etcdAddress = ip + ":2379"
}
gTestIDAllocator = NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase([]string{etcdAddress}, "/test/root/kv", "gidTest"))
t.Run("Initialize", func(t *testing.T) {
err := gTestIDAllocator.Initialize()
assert.Nil(t, err)
})
t.Run("AllocOne", func(t *testing.T) {
one, err := gTestIDAllocator.AllocOne()
assert.Nil(t, err)
ano, err := gTestIDAllocator.AllocOne()
assert.Nil(t, err)
assert.NotEqual(t, one, ano)
})
t.Run("Alloc", func(t *testing.T) {
count := uint32(2 << 10)
idStart, idEnd, err := gTestIDAllocator.Alloc(count)
assert.Nil(t, err)
assert.Equal(t, count, uint32(idEnd-idStart))
})
}

View File

@ -45,7 +45,8 @@ type Allocator interface {
// GlobalTSOAllocator is the global single point TSO allocator.
type GlobalTSOAllocator struct {
tso *timestampOracle
tso *timestampOracle
LimitMaxLogic bool
}
// NewGlobalTSOAllocator creates a new global TSO allocator.
@ -58,6 +59,7 @@ func NewGlobalTSOAllocator(key string, kvBase kv.TxnBase) *GlobalTSOAllocator {
maxResetTSGap: func() time.Duration { return 3 * time.Second },
key: key,
},
LimitMaxLogic: true,
}
}
@ -66,6 +68,10 @@ func (gta *GlobalTSOAllocator) Initialize() error {
return gta.tso.InitTimestamp()
}
func (gta *GlobalTSOAllocator) EnableMaxLogic(enable bool) {
gta.LimitMaxLogic = enable
}
// UpdateTSO is used to update the TSO in memory and the time window in etcd.
func (gta *GlobalTSOAllocator) UpdateTSO() error {
return gta.tso.UpdateTimestamp()
@ -97,7 +103,7 @@ func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (uint64, error) {
physical = current.physical.UnixNano() / int64(time.Millisecond)
logical = atomic.AddInt64(&current.logical, int64(count))
if logical >= maxLogical {
if logical >= maxLogical && gta.LimitMaxLogic {
log.Println("logical part outside of max logical interval, please check ntp time",
zap.Int("retry-count", i))
time.Sleep(UpdateTimestampStep)

View File

@ -0,0 +1,81 @@
package tso
import (
"os"
"testing"
"time"
"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
"github.com/stretchr/testify/assert"
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
)
var gTestTsoAllocator *GlobalTSOAllocator
func TestGlobalTSOAllocator_All(t *testing.T) {
etcdAddress := os.Getenv("ETCD_ADDRESS")
if etcdAddress == "" {
ip := funcutil.GetLocalIP()
etcdAddress = ip + ":2379"
}
gTestTsoAllocator = NewGlobalTSOAllocator("timestamp", tsoutil.NewTSOKVBase([]string{etcdAddress}, "/test/root/kv", "tsoTest"))
t.Run("Initialize", func(t *testing.T) {
err := gTestTsoAllocator.Initialize()
assert.Nil(t, err)
})
t.Run("GenerateTSO", func(t *testing.T) {
count := 1000
perCount := uint32(100)
startTs, err := gTestTsoAllocator.GenerateTSO(perCount)
assert.Nil(t, err)
lastPhysical, lastLogical := tsoutil.ParseTS(startTs)
for i := 0; i < count; i++ {
ts, _ := gTestTsoAllocator.GenerateTSO(perCount)
physical, logical := tsoutil.ParseTS(ts)
if lastPhysical.Equal(physical) {
diff := logical - lastLogical
assert.Equal(t, uint64(perCount), diff)
}
lastPhysical, lastLogical = physical, logical
}
})
gTestTsoAllocator.EnableMaxLogic(false)
t.Run("GenerateTSO2", func(t *testing.T) {
count := 1000
maxL := 1 << 18
startTs, err := gTestTsoAllocator.GenerateTSO(uint32(maxL))
step := 10
perCount := uint32(step) << 18 // 10 ms
assert.Nil(t, err)
lastPhysical, lastLogical := tsoutil.ParseTS(startTs)
for i := 0; i < count; i++ {
ts, _ := gTestTsoAllocator.GenerateTSO(perCount)
physical, logical := tsoutil.ParseTS(ts)
assert.Equal(t, logical, lastLogical)
assert.Equal(t, physical, lastPhysical.Add(time.Millisecond*time.Duration(step)))
lastPhysical = physical
}
})
gTestTsoAllocator.EnableMaxLogic(true)
t.Run("SetTSO", func(t *testing.T) {
curTime := time.Now()
nextTime := curTime.Add(2 * time.Second)
physical := nextTime.UnixNano() / int64(time.Millisecond)
logical := int64(0)
err := gTestTsoAllocator.SetTSO(tsoutil.ComposeTS(physical, logical))
assert.Nil(t, err)
})
t.Run("UpdateTSO", func(t *testing.T) {
err := gTestTsoAllocator.UpdateTSO()
assert.Nil(t, err)
})
t.Run("Reset", func(t *testing.T) {
gTestTsoAllocator.Reset()
})
}

View File

@ -14,7 +14,7 @@ ROOT_DIR="$( cd -P "$( dirname "$SOURCE" )/.." && pwd )"
MILVUS_DIR="${ROOT_DIR}/internal/"
echo $MILVUS_DIR
go test -race -cover "${MILVUS_DIR}/kv/..." -failfast
go test -race -cover "${MILVUS_DIR}/kv/..." "${MILVUS_DIR}/tso/..." "${MILVUS_DIR}/allocator/..." -failfast
# TODO: remove to distributed
#go test -race -cover "${MILVUS_DIR}/proxynode/..." -failfast
go test -race -cover "${MILVUS_DIR}/datanode/..." -failfast