enhance: speed up streaming barrier timetick (#38787)

issue: #38399

- use last allocate but not last confirmed id to make barrier. 
- move the barrier logic into the timetick allocator.
- try to sync up local allocator and remote allocator when first barrier
check not pass to speed up.

Signed-off-by: chyezh <chyezh@outlook.com>
pull/38789/head
Zhen Ye 2024-12-27 15:08:50 +08:00 committed by GitHub
parent a882f341f4
commit 4ba0ed3178
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 135 additions and 32 deletions

View File

@ -21,6 +21,8 @@ import (
"sync"
"time"
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/util/syncutil"
)
@ -33,7 +35,7 @@ var _ Allocator = (*allocatorImpl)(nil)
// NewTSOAllocator creates a new allocator.
func NewTSOAllocator(rc *syncutil.Future[types.RootCoordClient]) Allocator {
return &allocatorImpl{
mu: sync.Mutex{},
cond: syncutil.NewContextCond(&sync.Mutex{}),
remoteAllocator: newTSOAllocator(rc),
localAllocator: newLocalAllocator(),
}
@ -42,7 +44,7 @@ func NewTSOAllocator(rc *syncutil.Future[types.RootCoordClient]) Allocator {
// NewIDAllocator creates a new allocator.
func NewIDAllocator(rc *syncutil.Future[types.RootCoordClient]) Allocator {
return &allocatorImpl{
mu: sync.Mutex{},
cond: syncutil.NewContextCond(&sync.Mutex{}),
remoteAllocator: newIDAllocator(rc),
localAllocator: newLocalAllocator(),
}
@ -56,6 +58,9 @@ type Allocator interface {
// Allocate allocates a timestamp.
Allocate(ctx context.Context) (uint64, error)
// BarrierUtil make a barrier, next allocate call will generate id greater than barrier.
BarrierUntil(ctx context.Context, barrier uint64) error
// Sync expire the local allocator messages,
// syncs the local allocator and remote allocator.
Sync()
@ -65,37 +70,92 @@ type Allocator interface {
}
type allocatorImpl struct {
mu sync.Mutex
cond *syncutil.ContextCond
remoteAllocator remoteBatchAllocator
lastSyncTime time.Time
lastAllocated uint64
localAllocator *localAllocator
}
// AllocateOne allocates a timestamp.
func (ta *allocatorImpl) Allocate(ctx context.Context) (uint64, error) {
ta.mu.Lock()
defer ta.mu.Unlock()
ta.cond.L.Lock()
defer ta.cond.L.Unlock()
return ta.allocateOne(ctx)
}
func (ta *allocatorImpl) BarrierUntil(ctx context.Context, barrier uint64) error {
err := ta.barrierFastPath(ctx, barrier)
if err == nil {
return nil
}
if !errors.Is(err, errFastPathFailed) {
return err
}
// Fall back to the slow path to avoid block other id allocation opeartions.
ta.cond.L.Lock()
for ta.lastAllocated < barrier {
if err := ta.cond.Wait(ctx); err != nil {
return err
}
}
ta.cond.L.Unlock()
return nil
}
func (ta *allocatorImpl) barrierFastPath(ctx context.Context, barrier uint64) error {
ta.cond.L.Lock()
defer ta.cond.L.Unlock()
for i := 0; i < 2; i++ {
id, err := ta.allocateOne(ctx)
if err != nil {
return err
}
// check if the allocated id is greater than barrier.
if id >= barrier {
return nil
}
if i == 0 {
// force to syncup the local allocator and remote allocator at first time.
// It's the fast path if the barrier is allocated from same remote allocator.
ta.localAllocator.exhausted()
}
}
return errFastPathFailed
}
func (ta *allocatorImpl) allocateOne(ctx context.Context) (uint64, error) {
// allocate one from local allocator first.
if id, err := ta.localAllocator.allocateOne(); err == nil {
ta.lastAllocated = id
ta.cond.UnsafeBroadcast()
return id, nil
}
// allocate from remote.
return ta.allocateRemote(ctx)
id, err := ta.allocateRemote(ctx)
if err != nil {
return 0, err
}
ta.lastAllocated = id
ta.cond.UnsafeBroadcast()
return id, nil
}
// Sync expire the local allocator messages,
// syncs the local allocator and remote allocator.
func (ta *allocatorImpl) Sync() {
ta.mu.Lock()
defer ta.mu.Unlock()
ta.cond.L.Lock()
defer ta.cond.L.Unlock()
ta.localAllocator.exhausted()
}
func (ta *allocatorImpl) SyncIfExpired(expire time.Duration) {
ta.mu.Lock()
defer ta.mu.Unlock()
ta.cond.L.Lock()
defer ta.cond.L.Unlock()
if time.Since(ta.lastSyncTime) > expire {
ta.localAllocator.exhausted()

View File

@ -58,3 +58,52 @@ func TestTimestampAllocator(t *testing.T) {
_, err := allocator.Allocate(context.Background())
assert.Error(t, err)
}
func TestIDAllocator(t *testing.T) {
paramtable.Init()
paramtable.SetNodeID(1)
client := NewMockRootCoordClient(t)
f := syncutil.NewFuture[types.RootCoordClient]()
f.Set(client)
allocator := NewIDAllocator(f)
// Make local dirty
allocator.Allocate(context.Background())
// Test barrier fast path.
resp, err := client.AllocID(context.Background(), &rootcoordpb.AllocIDRequest{
Count: 100,
})
assert.NoError(t, err)
err = allocator.BarrierUntil(context.Background(), uint64(resp.ID))
assert.NoError(t, err)
newBarrierTimeTick, err := allocator.Allocate(context.Background())
assert.NoError(t, err)
assert.Greater(t, newBarrierTimeTick, uint64(resp.ID))
// Test slow path.
ch := make(chan struct{})
go func() {
barrier := newBarrierTimeTick + 1*batchAllocateSize
err := allocator.BarrierUntil(context.Background(), barrier)
assert.NoError(t, err)
newBarrierTimeTick, err := allocator.Allocate(context.Background())
assert.NoError(t, err)
assert.Greater(t, newBarrierTimeTick, barrier)
close(ch)
}()
select {
case <-ch:
assert.Fail(t, "should not finish")
case <-time.After(time.Millisecond * 20):
}
allocator.Sync()
_, err = allocator.Allocate(context.Background())
assert.NoError(t, err)
<-ch
allocator.SyncIfExpired(time.Millisecond * 50)
time.Sleep(time.Millisecond * 10)
allocator.SyncIfExpired(time.Millisecond * 10)
}

View File

@ -15,7 +15,10 @@ import (
"github.com/milvus-io/milvus/pkg/util/syncutil"
)
var errExhausted = errors.New("exhausted")
var (
errExhausted = errors.New("exhausted")
errFastPathFailed = errors.New("fast path failed")
)
// newLocalAllocator creates a new local allocator.
func newLocalAllocator() *localAllocator {

View File

@ -8,13 +8,12 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/util/syncutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
// AckManager manages the timestampAck.
type AckManager struct {
cond *syncutil.ContextCond
mu sync.Mutex
lastAllocatedTimeTick uint64 // The last allocated time tick, the latest timestamp allocated by the allocator.
lastConfirmedTimeTick uint64 // The last confirmed time tick, the message which time tick less than lastConfirmedTimeTick has been committed into wal.
notAckHeap typeutil.Heap[*Acker] // A minimum heap of timestampAck to search minimum allocated but not ack timestamp in list.
@ -34,7 +33,7 @@ func NewAckManager(
metrics *metricsutil.TimeTickMetrics,
) *AckManager {
return &AckManager{
cond: syncutil.NewContextCond(&sync.Mutex{}),
mu: sync.Mutex{},
lastAllocatedTimeTick: 0,
notAckHeap: typeutil.NewHeap[*Acker](&ackersOrderByTimestamp{}),
ackHeap: typeutil.NewHeap[*Acker](&ackersOrderByEndTimestamp{}),
@ -46,23 +45,18 @@ func NewAckManager(
// AllocateWithBarrier allocates a timestamp with a barrier.
func (ta *AckManager) AllocateWithBarrier(ctx context.Context, barrierTimeTick uint64) (*Acker, error) {
// wait until the lastConfirmedTimeTick is greater than barrierTimeTick.
ta.cond.L.Lock()
for ta.lastConfirmedTimeTick <= barrierTimeTick {
if err := ta.cond.Wait(ctx); err != nil {
return nil, err
}
// Just make a barrier to the underlying allocator.
if err := resource.Resource().TSOAllocator().BarrierUntil(ctx, barrierTimeTick); err != nil {
return nil, err
}
ta.cond.L.Unlock()
return ta.Allocate(ctx)
}
// Allocate allocates a timestamp.
// Concurrent safe to call with Sync and Allocate.
func (ta *AckManager) Allocate(ctx context.Context) (*Acker, error) {
ta.cond.L.Lock()
defer ta.cond.L.Unlock()
ta.mu.Lock()
defer ta.mu.Unlock()
// allocate one from underlying allocator first.
ts, err := resource.Resource().TSOAllocator().Allocate(ctx)
@ -97,8 +91,8 @@ func (ta *AckManager) SyncAndGetAcknowledged(ctx context.Context) ([]*AckDetail,
}
tsWithAck.Ack(OptSync())
ta.cond.L.Lock()
defer ta.cond.L.Unlock()
ta.mu.Lock()
defer ta.mu.Unlock()
details := ta.acknowledgedDetails
ta.acknowledgedDetails = make(sortedDetails, 0, 5)
@ -107,8 +101,8 @@ func (ta *AckManager) SyncAndGetAcknowledged(ctx context.Context) ([]*AckDetail,
// ack marks the timestamp as acknowledged.
func (ta *AckManager) ack(acker *Acker) {
ta.cond.L.Lock()
defer ta.cond.L.Unlock()
ta.mu.Lock()
defer ta.mu.Unlock()
acker.acknowledged = true
acker.detail.EndTimestamp = ta.lastAllocatedTimeTick
@ -129,9 +123,6 @@ func (ta *AckManager) popUntilLastAllAcknowledged() {
return
}
// broadcast to notify the last confirmed timetick updated.
ta.cond.UnsafeBroadcast()
// update last confirmed time tick.
ta.lastConfirmedTimeTick = acknowledgedDetails[len(acknowledgedDetails)-1].BeginTimestamp
ta.metrics.UpdateLastConfirmedTimeTick(ta.lastConfirmedTimeTick)