From 350dde666dc1d99f6d5a257b595b2be5d74b3fda Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Tue, 24 Sep 2024 14:33:13 +0800 Subject: [PATCH] fix: streaming node dead lock (#36403) issue: #36388 - fix dead lock. - fix barrier timetick failure. Signed-off-by: chyezh --- .../wal/interceptors/timetick/ack/ack.go | 11 +-- .../wal/interceptors/timetick/ack/ack_test.go | 83 +++++++++++++++++++ .../wal/interceptors/timetick/ack/manager.go | 17 ++-- .../timetick/timetick_interceptor.go | 1 + 4 files changed, 97 insertions(+), 15 deletions(-) diff --git a/internal/streamingnode/server/wal/interceptors/timetick/ack/ack.go b/internal/streamingnode/server/wal/interceptors/timetick/ack/ack.go index 8c9b00de02..debf70d091 100644 --- a/internal/streamingnode/server/wal/interceptors/timetick/ack/ack.go +++ b/internal/streamingnode/server/wal/interceptors/timetick/ack/ack.go @@ -1,8 +1,6 @@ package ack import ( - "go.uber.org/atomic" - "github.com/milvus-io/milvus/pkg/streaming/util/message" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -14,9 +12,9 @@ var ( // Acker records the timestamp and last confirmed message id that has not been acknowledged. type Acker struct { - acknowledged *atomic.Bool // is acknowledged. - detail *AckDetail // info is available after acknowledged. - manager *AckManager // the manager of the acker. + acknowledged bool // is acknowledged. + detail *AckDetail // info is available after acknowledged. + manager *AckManager // the manager of the acker. } // LastConfirmedMessageID returns the last confirmed message id. @@ -34,13 +32,12 @@ func (ta *Acker) Ack(opts ...AckOption) { for _, opt := range opts { opt(ta.detail) } - ta.acknowledged.Store(true) ta.manager.ack(ta) } // ackDetail returns the ack info, only can be called after acknowledged. func (ta *Acker) ackDetail() *AckDetail { - if !ta.acknowledged.Load() { + if !ta.acknowledged { panic("unreachable: ackDetail can only be called after acknowledged") } return ta.detail diff --git a/internal/streamingnode/server/wal/interceptors/timetick/ack/ack_test.go b/internal/streamingnode/server/wal/interceptors/timetick/ack/ack_test.go index 95c8e22c15..492fb3957a 100644 --- a/internal/streamingnode/server/wal/interceptors/timetick/ack/ack_test.go +++ b/internal/streamingnode/server/wal/interceptors/timetick/ack/ack_test.go @@ -3,7 +3,10 @@ package ack import ( "context" "fmt" + "math/rand" + "sync" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -13,6 +16,7 @@ import ( "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/proto/rootcoordpb" "github.com/milvus-io/milvus/internal/streamingnode/server/resource" + "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/walimplstest" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -133,3 +137,82 @@ func TestAck(t *testing.T) { // no more timestamp to ack. assert.Zero(t, ackManager.notAckHeap.Len()) } + +func TestAckManager(t *testing.T) { + paramtable.Init() + paramtable.SetNodeID(1) + + ctx := context.Background() + + counter := atomic.NewUint64(1) + rc := mocks.NewMockRootCoordClient(t) + rc.EXPECT().AllocTimestamp(mock.Anything, mock.Anything).RunAndReturn( + func(ctx context.Context, atr *rootcoordpb.AllocTimestampRequest, co ...grpc.CallOption) (*rootcoordpb.AllocTimestampResponse, error) { + if atr.Count > 1000 { + panic(fmt.Sprintf("count %d is too large", atr.Count)) + } + c := counter.Add(uint64(atr.Count)) + return &rootcoordpb.AllocTimestampResponse{ + Status: merr.Success(), + Timestamp: c - uint64(atr.Count), + Count: atr.Count, + }, nil + }, + ) + resource.InitForTest(t, resource.OptRootCoordClient(rc)) + + ackManager := NewAckManager(0, walimplstest.NewTestMessageID(0)) + + // Test Concurrent Collect. + wg := sync.WaitGroup{} + details := make([]*AckDetail, 0, 10) + wg.Add(1) + go func() { + defer wg.Done() + for { + currentDetails, err := ackManager.SyncAndGetAcknowledged(ctx) + assert.NoError(t, err) + for _, d := range currentDetails { + if !d.IsSync { + details = append(details, d) + } + } + if len(details) == 1100 { + break + } + time.Sleep(200 * time.Millisecond) + } + }() + for i := 0; i < 1000; i++ { + wg.Add(1) + go func() { + defer wg.Done() + time.Sleep(time.Duration(rand.Int31n(5)) * time.Millisecond) + ts, err := ackManager.Allocate(ctx) + assert.NoError(t, err) + time.Sleep(time.Duration(rand.Int31n(5)) * time.Millisecond) + id, err := resource.Resource().TSOAllocator().Allocate(ctx) + assert.NoError(t, err) + ts.Ack( + OptMessageID(walimplstest.NewTestMessageID(int64(id))), + ) + }() + } + for i := 0; i < 100; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + time.Sleep(time.Duration(rand.Int31n(5)) * time.Millisecond) + ts, err := ackManager.AllocateWithBarrier(ctx, uint64(i*10)) + assert.NoError(t, err) + assert.Greater(t, ts.Timestamp(), uint64(i*10)) + time.Sleep(time.Duration(rand.Int31n(5)) * time.Millisecond) + id, err := resource.Resource().TSOAllocator().Allocate(ctx) + assert.NoError(t, err) + ts.Ack( + OptMessageID(walimplstest.NewTestMessageID(int64(id))), + ) + }(i) + } + wg.Wait() +} diff --git a/internal/streamingnode/server/wal/interceptors/timetick/ack/manager.go b/internal/streamingnode/server/wal/interceptors/timetick/ack/manager.go index a34f897b07..cc7e518d51 100644 --- a/internal/streamingnode/server/wal/interceptors/timetick/ack/manager.go +++ b/internal/streamingnode/server/wal/interceptors/timetick/ack/manager.go @@ -4,8 +4,6 @@ import ( "context" "sync" - "go.uber.org/atomic" - "github.com/milvus-io/milvus/internal/streamingnode/server/resource" "github.com/milvus-io/milvus/pkg/streaming/util/message" "github.com/milvus-io/milvus/pkg/util/syncutil" @@ -18,7 +16,9 @@ type AckManager struct { lastAllocatedTimeTick uint64 // The last allocated time tick, the latest timestamp allocated by the allocator. lastConfirmedTimeTick uint64 // The last confirmed time tick, the message which time tick less than lastConfirmedTimeTick has been committed into wal. notAckHeap typeutil.Heap[*Acker] // A minimum heap of timestampAck to search minimum allocated but not ack timestamp in list. - ackHeap typeutil.Heap[*Acker] // A minimum heap of timestampAck to search minimum ack timestamp in list. + // Actually, the notAckHeap can be replaced by a list because of the the allocate operation is protected by mutex, + // keep it as a heap to make the code more readable. + ackHeap typeutil.Heap[*Acker] // A minimum heap of timestampAck to search minimum ack timestamp in list. // It is used to detect the concurrent operation to find the last confirmed message id. acknowledgedDetails sortedDetails // All ack details which time tick less than lastConfirmedTimeTick will be temporarily kept here until sync operation happens. lastConfirmedManager *lastConfirmedManager // The last confirmed message id manager. @@ -43,7 +43,7 @@ func NewAckManager( func (ta *AckManager) AllocateWithBarrier(ctx context.Context, barrierTimeTick uint64) (*Acker, error) { // wait until the lastConfirmedTimeTick is greater than barrierTimeTick. ta.cond.L.Lock() - if ta.lastConfirmedTimeTick <= barrierTimeTick { + for ta.lastConfirmedTimeTick <= barrierTimeTick { if err := ta.cond.Wait(ctx); err != nil { return nil, err } @@ -69,7 +69,7 @@ func (ta *AckManager) Allocate(ctx context.Context) (*Acker, error) { // create new timestampAck for ack process. // add ts to heap wait for ack. acker := &Acker{ - acknowledged: atomic.NewBool(false), + acknowledged: false, detail: newAckDetail(ts, ta.lastConfirmedManager.GetLastConfirmedMessageID()), manager: ta, } @@ -104,6 +104,7 @@ func (ta *AckManager) ack(acker *Acker) { ta.cond.L.Lock() defer ta.cond.L.Unlock() + acker.acknowledged = true acker.detail.EndTimestamp = ta.lastAllocatedTimeTick ta.ackHeap.Push(acker) ta.popUntilLastAllAcknowledged() @@ -113,7 +114,7 @@ func (ta *AckManager) ack(acker *Acker) { func (ta *AckManager) popUntilLastAllAcknowledged() { // pop all acknowledged timestamps. acknowledgedDetails := make(sortedDetails, 0, 5) - for ta.notAckHeap.Len() > 0 && ta.notAckHeap.Peek().acknowledged.Load() { + for ta.notAckHeap.Len() > 0 && ta.notAckHeap.Peek().acknowledged { ack := ta.notAckHeap.Pop() acknowledgedDetails = append(acknowledgedDetails, ack.ackDetail()) } @@ -128,8 +129,8 @@ func (ta *AckManager) popUntilLastAllAcknowledged() { ta.lastConfirmedTimeTick = acknowledgedDetails[len(acknowledgedDetails)-1].BeginTimestamp // pop all EndTimestamp is less than lastConfirmedTimeTick. - // The message which EndTimetick less than lastConfirmedTimeTick has all been committed into wal. - // So the MessageID of the messages is dense and continuous. + // All the messages which EndTimetick less than lastConfirmedTimeTick have been committed into wal. + // So the MessageID of those messages is dense and continuous. confirmedDetails := make(sortedDetails, 0, 5) for ta.ackHeap.Len() > 0 && ta.ackHeap.Peek().detail.EndTimestamp < ta.lastConfirmedTimeTick { ack := ta.ackHeap.Pop() diff --git a/internal/streamingnode/server/wal/interceptors/timetick/timetick_interceptor.go b/internal/streamingnode/server/wal/interceptors/timetick/timetick_interceptor.go index 1acb87c22a..e42574117b 100644 --- a/internal/streamingnode/server/wal/interceptors/timetick/timetick_interceptor.go +++ b/internal/streamingnode/server/wal/interceptors/timetick/timetick_interceptor.go @@ -89,6 +89,7 @@ func (impl *timeTickAppendInterceptor) DoAppend(ctx context.Context, msg message defer func() { if err != nil { txnSession.AddNewMessageFail() + return } // perform keepalive for the transaction session if append success. txnSession.AddNewMessageDoneAndKeepalive(msg.TimeTick())