Fix RootCoord double updates TSO (#22715) (#22723)

Signed-off-by: yah01 <yang.cen@zilliz.com>
pull/22746/head
yah01 2023-03-14 10:55:55 +08:00 committed by GitHub
parent 6f647b0f08
commit 802e3469b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 24 additions and 30 deletions

View File

@ -90,7 +90,7 @@ func (ticker *channelsTimeTickerImpl) initCurrents(current Timestamp) {
}
func (ticker *channelsTimeTickerImpl) tick() error {
now, err := ticker.tso.AllocOne()
now, err := ticker.tso.AllocOne(ticker.ctx)
if err != nil {
log.Warn("Proxy channelsTimeTickerImpl failed to get ts from tso", zap.Error(err))
return err
@ -168,7 +168,7 @@ func (ticker *channelsTimeTickerImpl) tickLoop() {
func (ticker *channelsTimeTickerImpl) start() error {
ticker.initStatistics()
current, err := ticker.tso.AllocOne()
current, err := ticker.tso.AllocOne(ticker.ctx)
if err != nil {
return err
}

View File

@ -25,7 +25,7 @@ import (
// use interface tsoAllocator to keep other components testable
// include: channelsTimeTickerImpl, baseTaskQueue, taskScheduler
type tsoAllocator interface {
AllocOne() (Timestamp, error)
AllocOne(ctx context.Context) (Timestamp, error)
}
// use timestampAllocatorInterface to keep other components testable

View File

@ -67,7 +67,7 @@ func newMockTimestampAllocatorInterface() timestampAllocatorInterface {
type mockTsoAllocator struct {
}
func (tso *mockTsoAllocator) AllocOne() (Timestamp, error) {
func (tso *mockTsoAllocator) AllocOne(ctx context.Context) (Timestamp, error) {
return Timestamp(time.Now().UnixNano()), nil
}

View File

@ -206,8 +206,8 @@ func (node *Proxy) Init() error {
node.rowIDAllocator = idAllocator
log.Info("create id allocator done", zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", Params.ProxyCfg.GetNodeID()))
log.Info("create timestamp allocator", zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", Params.ProxyCfg.GetNodeID()))
tsoAllocator, err := newTimestampAllocator(node.ctx, node.rootCoord, Params.ProxyCfg.GetNodeID())
log.Debug("create timestamp allocator", zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", Params.ProxyCfg.GetNodeID()))
tsoAllocator, err := newTimestampAllocator(node.rootCoord, Params.ProxyCfg.GetNodeID())
if err != nil {
log.Warn("failed to create timestamp allocator",
zap.Error(err),

View File

@ -169,7 +169,7 @@ func (queue *baseTaskQueue) Enqueue(t task) error {
return err
}
ts, err := queue.tsoAllocatorIns.AllocOne()
ts, err := queue.tsoAllocatorIns.AllocOne(t.TraceCtx())
if err != nil {
return err
}

View File

@ -31,24 +31,23 @@ import (
// timestampAllocator implements tsoAllocator.
type timestampAllocator struct {
ctx context.Context
tso timestampAllocatorInterface
peerID UniqueID
}
// newTimestampAllocator creates a new timestampAllocator
func newTimestampAllocator(ctx context.Context, tso timestampAllocatorInterface, peerID UniqueID) (*timestampAllocator, error) {
func newTimestampAllocator(tso timestampAllocatorInterface, peerID UniqueID) (*timestampAllocator, error) {
a := &timestampAllocator{
ctx: ctx,
peerID: peerID,
tso: tso,
}
return a, nil
}
func (ta *timestampAllocator) alloc(count uint32) ([]Timestamp, error) {
func (ta *timestampAllocator) alloc(ctx context.Context, count uint32) ([]Timestamp, error) {
tr := timerecord.NewTimeRecorder("applyTimestamp")
ctx, cancel := context.WithTimeout(ta.ctx, 5*time.Second)
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
req := &rootcoordpb.AllocTimestampRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_RequestTSO),
@ -61,7 +60,6 @@ func (ta *timestampAllocator) alloc(count uint32) ([]Timestamp, error) {
resp, err := ta.tso.AllocTimestamp(ctx, req)
defer func() {
cancel()
metrics.ProxyApplyTimestampLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Observe(float64(tr.ElapseSpan().Milliseconds()))
}()
@ -81,8 +79,8 @@ func (ta *timestampAllocator) alloc(count uint32) ([]Timestamp, error) {
}
// AllocOne allocates a timestamp.
func (ta *timestampAllocator) AllocOne() (Timestamp, error) {
ret, err := ta.alloc(1)
func (ta *timestampAllocator) AllocOne(ctx context.Context) (Timestamp, error) {
ret, err := ta.alloc(ctx, 1)
if err != nil {
return 0, err
}

View File

@ -27,11 +27,10 @@ import (
)
func TestNewTimestampAllocator(t *testing.T) {
ctx := context.Background()
tso := newMockTimestampAllocatorInterface()
peerID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
tsAllocator, err := newTimestampAllocator(ctx, tso, peerID)
tsAllocator, err := newTimestampAllocator(tso, peerID)
assert.Nil(t, err)
assert.NotNil(t, tsAllocator)
}
@ -41,12 +40,12 @@ func TestTimestampAllocator_alloc(t *testing.T) {
tso := newMockTimestampAllocatorInterface()
peerID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
tsAllocator, err := newTimestampAllocator(ctx, tso, peerID)
tsAllocator, err := newTimestampAllocator(tso, peerID)
assert.Nil(t, err)
assert.NotNil(t, tsAllocator)
count := rand.Uint32()%100 + 1
ret, err := tsAllocator.alloc(count)
ret, err := tsAllocator.alloc(ctx, count)
assert.Nil(t, err)
assert.Equal(t, int(count), len(ret))
}
@ -56,10 +55,10 @@ func TestTimestampAllocator_AllocOne(t *testing.T) {
tso := newMockTimestampAllocatorInterface()
peerID := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
tsAllocator, err := newTimestampAllocator(ctx, tso, peerID)
tsAllocator, err := newTimestampAllocator(tso, peerID)
assert.Nil(t, err)
assert.NotNil(t, tsAllocator)
_, err = tsAllocator.AllocOne()
_, err = tsAllocator.AllocOne(ctx)
assert.Nil(t, err)
}

View File

@ -245,15 +245,12 @@ func (c *Core) tsLoop() {
select {
case <-tsoTicker.C:
if err := c.tsoAllocator.UpdateTSO(); err != nil {
log.Warn("failed to update timestamp: ", zap.Error(err))
log.Warn("failed to update tso", zap.Error(err))
continue
}
ts := c.tsoAllocator.GetLastSavedTime()
metrics.RootCoordTimestampSaved.Set(float64(ts.Unix()))
if err := c.tsoAllocator.UpdateTSO(); err != nil {
log.Warn("failed to update id: ", zap.Error(err))
continue
}
case <-ctx.Done():
return
}

View File

@ -30,13 +30,13 @@
package tso
import (
"log"
"sync/atomic"
"time"
"errors"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.uber.org/zap"
@ -119,15 +119,15 @@ func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (uint64, error) {
current := (*atomicObject)(atomic.LoadPointer(&gta.tso.TSO))
if current == nil || current.physical.Equal(typeutil.ZeroTime) {
// If it's leader, maybe SyncTimestamp hasn't completed yet
log.Println("sync hasn't completed yet, wait for a while")
log.Info("sync hasn't completed yet, wait for a while")
time.Sleep(200 * time.Millisecond)
continue
}
physical = current.physical.UnixNano() / int64(time.Millisecond)
physical = current.physical.UnixMilli()
logical = atomic.AddInt64(&current.logical, int64(count))
if logical >= maxLogical && gta.LimitMaxLogic {
log.Println("logical part outside of max logical interval, please check ntp time",
log.Info("logical part outside of max logical interval, please check ntp time",
zap.Int("retry-count", i))
time.Sleep(UpdateTimestampStep)
continue