Fix datanode flush manager injection unit test (#15358)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/15399/head
congqixia 2022-01-27 10:23:40 +08:00 committed by GitHub
parent d606aaa69e
commit 273890a91e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 6 additions and 8 deletions

View File

@ -184,8 +184,7 @@ func (q *orderFlushQueue) inject(inject *taskInjection) {
return return
} }
// otherwise just handle injection here // otherwise just handle injection here
go q.handleInject(inject)
q.handleInject(inject)
} }
func (q *orderFlushQueue) handleInject(inject *taskInjection) { func (q *orderFlushQueue) handleInject(inject *taskInjection) {
@ -194,6 +193,8 @@ func (q *orderFlushQueue) handleInject(inject *taskInjection) {
ok := <-inject.injectOver ok := <-inject.injectOver
// apply injection // apply injection
if ok { if ok {
q.injectMut.Lock()
defer q.injectMut.Unlock()
q.postInjection = inject.postInjection q.postInjection = inject.postInjection
} }
} }
@ -455,7 +456,7 @@ func (m *rendezvousFlushManager) flushDelData(data *DelDataBuf, segmentID Unique
func (m *rendezvousFlushManager) injectFlush(injection *taskInjection, segments ...UniqueID) { func (m *rendezvousFlushManager) injectFlush(injection *taskInjection, segments ...UniqueID) {
go injection.waitForInjected() go injection.waitForInjected()
for _, segmentID := range segments { for _, segmentID := range segments {
go m.getFlushQueue(segmentID).inject(injection) m.getFlushQueue(segmentID).inject(injection)
} }
} }

View File

@ -222,7 +222,6 @@ func TestRendezvousFlushManager_Inject(t *testing.T) {
rand.Read(id) rand.Read(id)
id2 := make([]byte, 10) id2 := make([]byte, 10)
rand.Read(id2) rand.Read(id2)
rand.Read(id)
m.flushBufferData(nil, 2, true, false, &internalpb.MsgPosition{ m.flushBufferData(nil, 2, true, false, &internalpb.MsgPosition{
MsgID: id, MsgID: id,
}) })
@ -235,16 +234,14 @@ func TestRendezvousFlushManager_Inject(t *testing.T) {
}) })
m.injectFlush(ti, 2, 3) m.injectFlush(ti, 2, 3)
go func() {
<-ti.injected
ti.injectDone(true)
}()
m.flushDelData(nil, 2, &internalpb.MsgPosition{ m.flushDelData(nil, 2, &internalpb.MsgPosition{
MsgID: id, MsgID: id,
}) })
m.flushDelData(nil, 3, &internalpb.MsgPosition{ m.flushDelData(nil, 3, &internalpb.MsgPosition{
MsgID: id2, MsgID: id2,
}) })
<-ti.Injected()
ti.injectDone(true)
finish.Wait() finish.Wait()
assert.EqualValues(t, size+2, counter.Load()) assert.EqualValues(t, size+2, counter.Load())