From 273890a91e778e7894d310861a74122fa31b7fe2 Mon Sep 17 00:00:00 2001 From: congqixia Date: Thu, 27 Jan 2022 10:23:40 +0800 Subject: [PATCH] Fix datanode flush manager injection unit test (#15358) Signed-off-by: Congqi Xia --- internal/datanode/flush_manager.go | 7 ++++--- internal/datanode/flush_manager_test.go | 7 ++----- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/internal/datanode/flush_manager.go b/internal/datanode/flush_manager.go index b91f3ce3dd..84ff5fd7c6 100644 --- a/internal/datanode/flush_manager.go +++ b/internal/datanode/flush_manager.go @@ -184,8 +184,7 @@ func (q *orderFlushQueue) inject(inject *taskInjection) { return } // otherwise just handle injection here - - q.handleInject(inject) + go q.handleInject(inject) } func (q *orderFlushQueue) handleInject(inject *taskInjection) { @@ -194,6 +193,8 @@ func (q *orderFlushQueue) handleInject(inject *taskInjection) { ok := <-inject.injectOver // apply injection if ok { + q.injectMut.Lock() + defer q.injectMut.Unlock() q.postInjection = inject.postInjection } } @@ -455,7 +456,7 @@ func (m *rendezvousFlushManager) flushDelData(data *DelDataBuf, segmentID Unique func (m *rendezvousFlushManager) injectFlush(injection *taskInjection, segments ...UniqueID) { go injection.waitForInjected() for _, segmentID := range segments { - go m.getFlushQueue(segmentID).inject(injection) + m.getFlushQueue(segmentID).inject(injection) } } diff --git a/internal/datanode/flush_manager_test.go b/internal/datanode/flush_manager_test.go index 3cd971a217..cbbc3180d3 100644 --- a/internal/datanode/flush_manager_test.go +++ b/internal/datanode/flush_manager_test.go @@ -222,7 +222,6 @@ func TestRendezvousFlushManager_Inject(t *testing.T) { rand.Read(id) id2 := make([]byte, 10) rand.Read(id2) - rand.Read(id) m.flushBufferData(nil, 2, true, false, &internalpb.MsgPosition{ MsgID: id, }) @@ -235,16 +234,14 @@ func TestRendezvousFlushManager_Inject(t *testing.T) { }) m.injectFlush(ti, 2, 3) - go func() { - <-ti.injected - ti.injectDone(true) - }() m.flushDelData(nil, 2, &internalpb.MsgPosition{ MsgID: id, }) m.flushDelData(nil, 3, &internalpb.MsgPosition{ MsgID: id2, }) + <-ti.Injected() + ti.injectDone(true) finish.Wait() assert.EqualValues(t, size+2, counter.Load())