Fix flush manager injection logic with multiple segments (#12260)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/12258/head
congqixia 2021-11-24 18:41:16 +08:00 committed by GitHub
parent e3f49858c5
commit 59d19384dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 107 additions and 60 deletions

View File

@ -296,17 +296,13 @@ func (t *compactionTask) compact() error {
}
// Inject to stop flush
ti := taskInjection{
injected: make(chan struct{}),
injectOver: make(chan bool),
postInjection: func(pack *segmentFlushPack) {
pack.segmentID = targetSegID
},
}
ti := newTaskInjection(len(segIDs), func(pack *segmentFlushPack) {
pack.segmentID = targetSegID
})
defer close(ti.injectOver)
t.injectFlush(ti, segIDs...)
<-ti.injected
<-ti.Injected()
var (
iItr = make([]iterator, 0)
@ -449,7 +445,7 @@ func (t *compactionTask) compact() error {
}
}
ti.injectOver <- true
ti.injectDone(true)
log.Info("compaction done", zap.Int64("planID", t.plan.GetPlanID()),
zap.Any("num of binlog paths", len(cpaths.inPaths)),
zap.Any("num of stats paths", len(cpaths.statsPaths)),

View File

@ -552,10 +552,11 @@ func (mfm *mockFlushManager) flushDelData(data *DelDataBuf, segmentID UniqueID,
return nil
}
func (mfm *mockFlushManager) injectFlush(injection taskInjection, segments ...UniqueID) {
func (mfm *mockFlushManager) injectFlush(injection *taskInjection, segments ...UniqueID) {
go func() {
time.Sleep(time.Second * time.Duration(mfm.sleepSeconds))
injection.injected <- struct{}{}
//injection.injected <- struct{}{}
close(injection.injected)
<-injection.injectOver
}()
}

View File

@ -41,7 +41,7 @@ type flushManager interface {
// notify flush manager del buffer data
flushDelData(data *DelDataBuf, segmentID UniqueID, pos *internalpb.MsgPosition) error
// injectFlush injects compaction or other blocking task before flush sync
injectFlush(injection taskInjection, segments ...UniqueID)
injectFlush(injection *taskInjection, segments ...UniqueID)
// close handles resource clean up
close()
}
@ -73,7 +73,7 @@ var _ flushManager = (*rendezvousFlushManager)(nil)
type orderFlushQueue struct {
sync.Once
segmentID UniqueID
injectCh chan taskInjection
injectCh chan *taskInjection
// MsgID => flushTask
working sync.Map
@ -93,7 +93,7 @@ func newOrderFlushQueue(segID UniqueID, f notifyMetaFunc) *orderFlushQueue {
q := &orderFlushQueue{
segmentID: segID,
notifyFunc: f,
injectCh: make(chan taskInjection, 100),
injectCh: make(chan *taskInjection, 100),
}
return q
}
@ -159,7 +159,7 @@ func (q *orderFlushQueue) enqueueDelFlush(task flushDeleteTask, deltaLogs *DelDa
// inject performs injection for current task queue
// send into injectCh in there is running task
// or perform injection logic here if there is no injection
func (q *orderFlushQueue) inject(inject taskInjection) {
func (q *orderFlushQueue) inject(inject *taskInjection) {
q.injectCh <- inject
}
@ -187,8 +187,13 @@ func (h *injectHandler) handleInjection(q *orderFlushQueue) {
injectDone := make(chan struct{})
q.tailCh = injectDone
q.tailMut.Unlock()
inject.injected <- struct{}{}
<-inject.injectOver
// notify one injection done
inject.injectOne()
ok := <-inject.injectOver
// apply injection
if ok {
q.postInjection = inject.postInjection
}
close(injectDone)
case <-h.done:
return
@ -346,7 +351,8 @@ func (m *rendezvousFlushManager) flushDelData(data *DelDataBuf, segmentID Unique
}
// injectFlush inject process before task finishes
func (m *rendezvousFlushManager) injectFlush(injection taskInjection, segments ...UniqueID) {
func (m *rendezvousFlushManager) injectFlush(injection *taskInjection, segments ...UniqueID) {
go injection.waitForInjected()
for _, segmentID := range segments {
m.getFlushQueue(segmentID).inject(injection)
}

View File

@ -177,23 +177,20 @@ func TestRendezvousFlushManager_Inject(t *testing.T) {
var counter atomic.Int64
finish := sync.WaitGroup{}
finish.Add(size)
packs := make([]*segmentFlushPack, 0, size+1)
var packMut sync.Mutex
packs := make([]*segmentFlushPack, 0, size+3)
m := NewRendezvousFlushManager(&allocator{}, kv, newMockReplica(), func(pack *segmentFlushPack) {
packMut.Lock()
packs = append(packs, pack)
packMut.Unlock()
counter.Inc()
finish.Done()
})
injected := make(chan struct{})
injectOver := make(chan bool)
m.injectFlush(taskInjection{
injected: injected,
injectOver: injectOver,
postInjection: func(*segmentFlushPack) {
},
}, 1)
<-injected
injectOver <- true
ti := newTaskInjection(1, func(*segmentFlushPack) {})
m.injectFlush(ti, 1)
<-ti.injected
ti.injectDone(true)
ids := make([][]byte, 0, size)
for i := 0; i < size; i++ {
@ -218,44 +215,60 @@ func TestRendezvousFlushManager_Inject(t *testing.T) {
assert.EqualValues(t, size, counter.Load())
finish.Add(1)
finish.Add(2)
id := make([]byte, 10)
rand.Read(id)
id2 := make([]byte, 10)
rand.Read(id2)
rand.Read(id)
m.flushBufferData(nil, 2, true, false, &internalpb.MsgPosition{
MsgID: id,
})
m.flushBufferData(nil, 3, true, false, &internalpb.MsgPosition{
MsgID: id2,
})
m.injectFlush(taskInjection{
injected: injected,
injectOver: injectOver,
postInjection: func(pack *segmentFlushPack) {
pack.segmentID = 3
},
}, 2)
ti = newTaskInjection(2, func(pack *segmentFlushPack) {
pack.segmentID = 4
})
m.injectFlush(ti, 2, 3)
go func() {
<-injected
injectOver <- true
<-ti.injected
ti.injectDone(true)
}()
m.flushDelData(nil, 2, &internalpb.MsgPosition{
MsgID: id,
})
m.flushDelData(nil, 3, &internalpb.MsgPosition{
MsgID: id2,
})
finish.Wait()
assert.EqualValues(t, size+1, counter.Load())
assert.EqualValues(t, 3, packs[size].segmentID)
assert.EqualValues(t, size+2, counter.Load())
assert.EqualValues(t, 4, packs[size].segmentID)
finish.Add(1)
rand.Read(id)
m.flushBufferData(nil, 2, false, false, &internalpb.MsgPosition{
MsgID: id,
})
ti = newTaskInjection(1, func(pack *segmentFlushPack) {
pack.segmentID = 5
})
go func() {
<-ti.injected
ti.injectDone(false) // inject fail, segment id shall not be changed to 5
}()
m.injectFlush(ti, 2)
m.flushDelData(nil, 2, &internalpb.MsgPosition{
MsgID: id,
})
finish.Wait()
assert.EqualValues(t, size+2, counter.Load())
assert.EqualValues(t, 3, packs[size+1].segmentID)
assert.EqualValues(t, size+3, counter.Load())
assert.EqualValues(t, 4, packs[size+1].segmentID)
}

View File

@ -54,7 +54,7 @@ type flushTaskRunner struct {
startSignal <-chan struct{}
finishSignal chan struct{}
injectSignal <-chan taskInjection
injectSignal <-chan *taskInjection
segmentID UniqueID
insertLogs map[UniqueID]string
@ -71,9 +71,45 @@ type flushTaskRunner struct {
type taskInjection struct {
injected chan struct{} // channel to notify injected
injectOver chan bool // indicates injection over
wg sync.WaitGroup
postInjection func(pack *segmentFlushPack)
}
func newTaskInjection(segmentCnt int, pf func(pack *segmentFlushPack)) *taskInjection {
ti := &taskInjection{
injected: make(chan struct{}),
injectOver: make(chan bool, segmentCnt),
postInjection: pf,
}
ti.wg.Add(segmentCnt)
return ti
}
// Injected returns a chan, which will be closed after pre set segments counts a injected
func (ti *taskInjection) Injected() <-chan struct{} {
return ti.injected
}
func (ti *taskInjection) waitForInjected() {
ti.wg.Wait()
close(ti.injected)
}
func (ti *taskInjection) injectOne() {
ti.wg.Done()
}
func (ti *taskInjection) injectDone(success bool) {
if !success {
close(ti.injectOver)
return
}
for i := 0; i < cap(ti.injectOver); i++ {
ti.injectOver <- true
}
}
// init initializes flushTaskRunner with provided actions and signal
func (t *flushTaskRunner) init(f notifyMetaFunc, postFunc taskPostFunc, signal <-chan struct{}) {
t.initOnce.Do(func() {
@ -136,7 +172,7 @@ func (t *flushTaskRunner) waitFinish(notifyFunc notifyMetaFunc, postFunc taskPos
select {
case injection := <-t.injectSignal:
// notify injected
injection.injected <- struct{}{}
injection.injectOne()
ok := <-injection.injectOver
if ok {
// apply postInjection func
@ -172,7 +208,7 @@ func (t *flushTaskRunner) getFlushPack() *segmentFlushPack {
}
// newFlushTaskRunner create a usable task runner
func newFlushTaskRunner(segmentID UniqueID, injectCh <-chan taskInjection) *flushTaskRunner {
func newFlushTaskRunner(segmentID UniqueID, injectCh <-chan *taskInjection) *flushTaskRunner {
t := &flushTaskRunner{
WaitGroup: sync.WaitGroup{},
segmentID: segmentID,

View File

@ -95,7 +95,7 @@ func TestFlushTaskRunner_FailError(t *testing.T) {
}
func TestFlushTaskRunner_Injection(t *testing.T) {
injectCh := make(chan taskInjection, 1)
injectCh := make(chan *taskInjection, 1)
task := newFlushTaskRunner(1, injectCh)
signal := make(chan struct{})
@ -103,21 +103,16 @@ func TestFlushTaskRunner_Injection(t *testing.T) {
nextFlag := false
processed := make(chan struct{})
injected := make(chan struct{})
injectOver := make(chan bool)
injectCh <- taskInjection{
injected: injected,
injectOver: injectOver,
postInjection: func(pack *segmentFlushPack) {
t.Log("task injection executed")
pack.segmentID = 2
},
}
ti := newTaskInjection(1, func(pack *segmentFlushPack) {
t.Log("task injection executed")
pack.segmentID = 2
})
go ti.waitForInjected()
injectCh <- ti
go func() {
<-injected
injectOver <- true
<-ti.injected
ti.injectDone(true)
}()
task.init(func(pack *segmentFlushPack) {