fix: streaming node dead lock (#36403)

issue: #36388

- fix dead lock.
- fix barrier timetick failure.

Signed-off-by: chyezh <chyezh@outlook.com>
pull/36470/head
Zhen Ye 2024-09-24 14:33:13 +08:00 committed by GitHub
parent d55d9d6e1d
commit 350dde666d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 97 additions and 15 deletions

View File

@ -1,8 +1,6 @@
package ack
import (
"go.uber.org/atomic"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -14,9 +12,9 @@ var (
// Acker records the timestamp and last confirmed message id that has not been acknowledged.
type Acker struct {
acknowledged *atomic.Bool // is acknowledged.
detail *AckDetail // info is available after acknowledged.
manager *AckManager // the manager of the acker.
acknowledged bool // is acknowledged.
detail *AckDetail // info is available after acknowledged.
manager *AckManager // the manager of the acker.
}
// LastConfirmedMessageID returns the last confirmed message id.
@ -34,13 +32,12 @@ func (ta *Acker) Ack(opts ...AckOption) {
for _, opt := range opts {
opt(ta.detail)
}
ta.acknowledged.Store(true)
ta.manager.ack(ta)
}
// ackDetail returns the ack info, only can be called after acknowledged.
func (ta *Acker) ackDetail() *AckDetail {
if !ta.acknowledged.Load() {
if !ta.acknowledged {
panic("unreachable: ackDetail can only be called after acknowledged")
}
return ta.detail

View File

@ -3,7 +3,10 @@ package ack
import (
"context"
"fmt"
"math/rand"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
@ -13,6 +16,7 @@ import (
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
"github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/walimplstest"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
@ -133,3 +137,82 @@ func TestAck(t *testing.T) {
// no more timestamp to ack.
assert.Zero(t, ackManager.notAckHeap.Len())
}
func TestAckManager(t *testing.T) {
paramtable.Init()
paramtable.SetNodeID(1)
ctx := context.Background()
counter := atomic.NewUint64(1)
rc := mocks.NewMockRootCoordClient(t)
rc.EXPECT().AllocTimestamp(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, atr *rootcoordpb.AllocTimestampRequest, co ...grpc.CallOption) (*rootcoordpb.AllocTimestampResponse, error) {
if atr.Count > 1000 {
panic(fmt.Sprintf("count %d is too large", atr.Count))
}
c := counter.Add(uint64(atr.Count))
return &rootcoordpb.AllocTimestampResponse{
Status: merr.Success(),
Timestamp: c - uint64(atr.Count),
Count: atr.Count,
}, nil
},
)
resource.InitForTest(t, resource.OptRootCoordClient(rc))
ackManager := NewAckManager(0, walimplstest.NewTestMessageID(0))
// Test Concurrent Collect.
wg := sync.WaitGroup{}
details := make([]*AckDetail, 0, 10)
wg.Add(1)
go func() {
defer wg.Done()
for {
currentDetails, err := ackManager.SyncAndGetAcknowledged(ctx)
assert.NoError(t, err)
for _, d := range currentDetails {
if !d.IsSync {
details = append(details, d)
}
}
if len(details) == 1100 {
break
}
time.Sleep(200 * time.Millisecond)
}
}()
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(time.Duration(rand.Int31n(5)) * time.Millisecond)
ts, err := ackManager.Allocate(ctx)
assert.NoError(t, err)
time.Sleep(time.Duration(rand.Int31n(5)) * time.Millisecond)
id, err := resource.Resource().TSOAllocator().Allocate(ctx)
assert.NoError(t, err)
ts.Ack(
OptMessageID(walimplstest.NewTestMessageID(int64(id))),
)
}()
}
for i := 0; i < 100; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
time.Sleep(time.Duration(rand.Int31n(5)) * time.Millisecond)
ts, err := ackManager.AllocateWithBarrier(ctx, uint64(i*10))
assert.NoError(t, err)
assert.Greater(t, ts.Timestamp(), uint64(i*10))
time.Sleep(time.Duration(rand.Int31n(5)) * time.Millisecond)
id, err := resource.Resource().TSOAllocator().Allocate(ctx)
assert.NoError(t, err)
ts.Ack(
OptMessageID(walimplstest.NewTestMessageID(int64(id))),
)
}(i)
}
wg.Wait()
}

View File

@ -4,8 +4,6 @@ import (
"context"
"sync"
"go.uber.org/atomic"
"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/util/syncutil"
@ -18,7 +16,9 @@ type AckManager struct {
lastAllocatedTimeTick uint64 // The last allocated time tick, the latest timestamp allocated by the allocator.
lastConfirmedTimeTick uint64 // The last confirmed time tick, the message which time tick less than lastConfirmedTimeTick has been committed into wal.
notAckHeap typeutil.Heap[*Acker] // A minimum heap of timestampAck to search minimum allocated but not ack timestamp in list.
ackHeap typeutil.Heap[*Acker] // A minimum heap of timestampAck to search minimum ack timestamp in list.
// Actually, the notAckHeap can be replaced by a list because of the the allocate operation is protected by mutex,
// keep it as a heap to make the code more readable.
ackHeap typeutil.Heap[*Acker] // A minimum heap of timestampAck to search minimum ack timestamp in list.
// It is used to detect the concurrent operation to find the last confirmed message id.
acknowledgedDetails sortedDetails // All ack details which time tick less than lastConfirmedTimeTick will be temporarily kept here until sync operation happens.
lastConfirmedManager *lastConfirmedManager // The last confirmed message id manager.
@ -43,7 +43,7 @@ func NewAckManager(
func (ta *AckManager) AllocateWithBarrier(ctx context.Context, barrierTimeTick uint64) (*Acker, error) {
// wait until the lastConfirmedTimeTick is greater than barrierTimeTick.
ta.cond.L.Lock()
if ta.lastConfirmedTimeTick <= barrierTimeTick {
for ta.lastConfirmedTimeTick <= barrierTimeTick {
if err := ta.cond.Wait(ctx); err != nil {
return nil, err
}
@ -69,7 +69,7 @@ func (ta *AckManager) Allocate(ctx context.Context) (*Acker, error) {
// create new timestampAck for ack process.
// add ts to heap wait for ack.
acker := &Acker{
acknowledged: atomic.NewBool(false),
acknowledged: false,
detail: newAckDetail(ts, ta.lastConfirmedManager.GetLastConfirmedMessageID()),
manager: ta,
}
@ -104,6 +104,7 @@ func (ta *AckManager) ack(acker *Acker) {
ta.cond.L.Lock()
defer ta.cond.L.Unlock()
acker.acknowledged = true
acker.detail.EndTimestamp = ta.lastAllocatedTimeTick
ta.ackHeap.Push(acker)
ta.popUntilLastAllAcknowledged()
@ -113,7 +114,7 @@ func (ta *AckManager) ack(acker *Acker) {
func (ta *AckManager) popUntilLastAllAcknowledged() {
// pop all acknowledged timestamps.
acknowledgedDetails := make(sortedDetails, 0, 5)
for ta.notAckHeap.Len() > 0 && ta.notAckHeap.Peek().acknowledged.Load() {
for ta.notAckHeap.Len() > 0 && ta.notAckHeap.Peek().acknowledged {
ack := ta.notAckHeap.Pop()
acknowledgedDetails = append(acknowledgedDetails, ack.ackDetail())
}
@ -128,8 +129,8 @@ func (ta *AckManager) popUntilLastAllAcknowledged() {
ta.lastConfirmedTimeTick = acknowledgedDetails[len(acknowledgedDetails)-1].BeginTimestamp
// pop all EndTimestamp is less than lastConfirmedTimeTick.
// The message which EndTimetick less than lastConfirmedTimeTick has all been committed into wal.
// So the MessageID of the messages is dense and continuous.
// All the messages which EndTimetick less than lastConfirmedTimeTick have been committed into wal.
// So the MessageID of those messages is dense and continuous.
confirmedDetails := make(sortedDetails, 0, 5)
for ta.ackHeap.Len() > 0 && ta.ackHeap.Peek().detail.EndTimestamp < ta.lastConfirmedTimeTick {
ack := ta.ackHeap.Pop()

View File

@ -89,6 +89,7 @@ func (impl *timeTickAppendInterceptor) DoAppend(ctx context.Context, msg message
defer func() {
if err != nil {
txnSession.AddNewMessageFail()
return
}
// perform keepalive for the transaction session if append success.
txnSession.AddNewMessageDoneAndKeepalive(msg.TimeTick())