mirror of https://github.com/milvus-io/milvus.git
Refactor flush manager injection to reduce goroutine number (#15180)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/15181/head
parent
384d3169d3
commit
2528b6850d
|
@ -93,7 +93,6 @@ type orderFlushQueue struct {
|
|||
|
||||
injectMut sync.Mutex
|
||||
runningTasks int32
|
||||
injectHandler *injectHandler
|
||||
postInjection postInjectionFunc
|
||||
}
|
||||
|
||||
|
@ -110,7 +109,6 @@ func newOrderFlushQueue(segID UniqueID, f notifyMetaFunc) *orderFlushQueue {
|
|||
// init orderFlushQueue use once protect init, init tailCh
|
||||
func (q *orderFlushQueue) init() {
|
||||
q.Once.Do(func() {
|
||||
q.injectHandler = newInjectHandler(q)
|
||||
// new queue acts like tailing task is done
|
||||
q.tailCh = make(chan struct{})
|
||||
close(q.tailCh)
|
||||
|
@ -125,10 +123,6 @@ func (q *orderFlushQueue) getFlushTaskRunner(pos *internalpb.MsgPosition) *flush
|
|||
// take over injection if task queue is handling it
|
||||
q.injectMut.Lock()
|
||||
q.runningTasks++
|
||||
if q.injectHandler != nil {
|
||||
q.injectHandler.close()
|
||||
q.injectHandler = nil
|
||||
}
|
||||
q.injectMut.Unlock()
|
||||
// add task to tail
|
||||
q.tailMut.Lock()
|
||||
|
@ -146,10 +140,6 @@ func (q *orderFlushQueue) postTask(pack *segmentFlushPack, postInjection postInj
|
|||
// after descreasing working count, check whether flush queue is empty
|
||||
q.injectMut.Lock()
|
||||
q.runningTasks--
|
||||
// if flush queue is empty, let flush queue take over injection
|
||||
if q.runningTasks == 0 {
|
||||
q.injectHandler = newInjectHandler(q)
|
||||
}
|
||||
// set postInjection function if injection is handled in task
|
||||
if postInjection != nil {
|
||||
q.postInjection = postInjection
|
||||
|
@ -158,6 +148,15 @@ func (q *orderFlushQueue) postTask(pack *segmentFlushPack, postInjection postInj
|
|||
if q.postInjection != nil {
|
||||
q.postInjection(pack)
|
||||
}
|
||||
|
||||
// if flush queue is empty, drain all injection from injectCh
|
||||
if q.runningTasks == 0 {
|
||||
for i := 0; i < len(q.injectCh); i++ {
|
||||
inject := <-q.injectCh
|
||||
go q.handleInject(inject)
|
||||
}
|
||||
}
|
||||
|
||||
q.injectMut.Unlock()
|
||||
}
|
||||
|
||||
|
@ -175,9 +174,31 @@ func (q *orderFlushQueue) enqueueDelFlush(task flushDeleteTask, deltaLogs *DelDa
|
|||
// send into injectCh in there is running task
|
||||
// or perform injection logic here if there is no injection
|
||||
func (q *orderFlushQueue) inject(inject *taskInjection) {
|
||||
q.injectCh <- inject
|
||||
q.injectMut.Lock()
|
||||
defer q.injectMut.Unlock()
|
||||
// check if there are running task(s)
|
||||
// if true, just put injection into injectCh
|
||||
// in case of task misses an injection, the injectCh shall be drained in `postTask`
|
||||
if q.runningTasks > 0 {
|
||||
q.injectCh <- inject
|
||||
return
|
||||
}
|
||||
// otherwise just handle injection here
|
||||
|
||||
q.handleInject(inject)
|
||||
}
|
||||
|
||||
func (q *orderFlushQueue) handleInject(inject *taskInjection) {
|
||||
// notify one injection done
|
||||
inject.injectOne()
|
||||
ok := <-inject.injectOver
|
||||
// apply injection
|
||||
if ok {
|
||||
q.postInjection = inject.postInjection
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
// injectionHandler handles injection for empty flush queue
|
||||
type injectHandler struct {
|
||||
once sync.Once
|
||||
|
@ -204,14 +225,6 @@ func (h *injectHandler) handleInjection(q *orderFlushQueue) {
|
|||
injectDone := make(chan struct{})
|
||||
q.tailCh = injectDone
|
||||
q.tailMut.Unlock()
|
||||
// notify one injection done
|
||||
inject.injectOne()
|
||||
ok := <-inject.injectOver
|
||||
// apply injection
|
||||
if ok {
|
||||
q.postInjection = inject.postInjection
|
||||
}
|
||||
close(injectDone)
|
||||
case <-h.done:
|
||||
return
|
||||
}
|
||||
|
@ -224,6 +237,7 @@ func (h *injectHandler) close() {
|
|||
h.wg.Wait()
|
||||
})
|
||||
}
|
||||
*/
|
||||
|
||||
type dropHandler struct {
|
||||
sync.Mutex
|
||||
|
@ -441,7 +455,7 @@ func (m *rendezvousFlushManager) flushDelData(data *DelDataBuf, segmentID Unique
|
|||
func (m *rendezvousFlushManager) injectFlush(injection *taskInjection, segments ...UniqueID) {
|
||||
go injection.waitForInjected()
|
||||
for _, segmentID := range segments {
|
||||
m.getFlushQueue(segmentID).inject(injection)
|
||||
go m.getFlushQueue(segmentID).inject(injection)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -517,8 +531,8 @@ func (m *rendezvousFlushManager) close() {
|
|||
//assertion ok
|
||||
queue := v.(*orderFlushQueue)
|
||||
queue.injectMut.Lock()
|
||||
if queue.injectHandler != nil {
|
||||
queue.injectHandler.close()
|
||||
for i := 0; i < len(queue.injectCh); i++ {
|
||||
go queue.handleInject(<-queue.injectCh)
|
||||
}
|
||||
queue.injectMut.Unlock()
|
||||
return true
|
||||
|
|
Loading…
Reference in New Issue