Add close for flushManager (#11874)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/11953/head
congqixia 2021-11-17 10:39:15 +08:00 committed by GitHub
parent cf07b46bca
commit 5e550a8793
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 60 additions and 5 deletions

View File

@ -559,3 +559,5 @@ func (mfm *mockFlushManager) injectFlush(injection taskInjection, segments ...Un
<-injection.injectOver
}()
}
func (mfm *mockFlushManager) close() {}

View File

@ -125,6 +125,7 @@ func (dsService *dataSyncService) close() {
}
dsService.cancelFn()
dsService.flushManager.close()
}
// initNodes inits a TimetickedFlowGraph

View File

@ -42,6 +42,8 @@ type flushManager interface {
flushDelData(data *DelDataBuf, segmentID UniqueID, pos *internalpb.MsgPosition) error
// injectFlush injects compaction or other blocking task before flush sync
injectFlush(injection taskInjection, segments ...UniqueID)
// close handles resource clean up
close()
}
// segmentFlushPack contains result to save into meta
@ -93,13 +95,13 @@ func newOrderFlushQueue(segID UniqueID, f notifyMetaFunc) *orderFlushQueue {
notifyFunc: f,
injectCh: make(chan taskInjection, 100),
}
q.injectHandler = newInjectHandler(q)
return q
}
// init orderFlushQueue use once protect init, init tailCh
func (q *orderFlushQueue) init() {
q.Once.Do(func() {
q.injectHandler = newInjectHandler(q)
// new queue acts like tailing task is done
q.tailCh = make(chan struct{})
close(q.tailCh)
@ -214,12 +216,11 @@ type rendezvousFlushManager struct {
// getFlushQueue
func (m *rendezvousFlushManager) getFlushQueue(segmentID UniqueID) *orderFlushQueue {
actual, loaded := m.dispatcher.LoadOrStore(segmentID, newOrderFlushQueue(segmentID, m.notifyFunc))
newQueue := newOrderFlushQueue(segmentID, m.notifyFunc)
actual, _ := m.dispatcher.LoadOrStore(segmentID, newQueue)
// all operation on dispatcher is private, assertion ok guaranteed
queue := actual.(*orderFlushQueue)
if !loaded {
queue.init()
}
queue.init()
return queue
}
@ -374,6 +375,20 @@ func (m *rendezvousFlushManager) getSegmentMeta(segmentID UniqueID, pos *interna
return collID, partID, meta, nil
}
// close cleans up all the left members
func (m *rendezvousFlushManager) close() {
m.dispatcher.Range(func(k, v interface{}) bool {
//assertion ok
queue := v.(*orderFlushQueue)
queue.injectMut.Lock()
if queue.injectHandler != nil {
queue.injectHandler.close()
}
queue.injectMut.Unlock()
return true
})
}
type flushBufferInsertTask struct {
kv.BaseKV
data map[string]string

View File

@ -280,6 +280,43 @@ func TestRendezvousFlushManager_getSegmentMeta(t *testing.T) {
assert.Error(t, err)
}
func TestRendezvousFlushManager_close(t *testing.T) {
kv := memkv.NewMemoryKV()
size := 1000
var counter atomic.Int64
finish := sync.WaitGroup{}
finish.Add(size)
m := NewRendezvousFlushManager(&allocator{}, kv, newMockReplica(), func(pack *segmentFlushPack) {
counter.Inc()
finish.Done()
})
ids := make([][]byte, 0, size)
for i := 0; i < size; i++ {
id := make([]byte, 10)
rand.Read(id)
ids = append(ids, id)
}
wg := sync.WaitGroup{}
wg.Add(size)
for i := 0; i < size; i++ {
m.flushDelData(nil, 1, &internalpb.MsgPosition{
MsgID: ids[i],
})
m.flushBufferData(nil, 1, true, false, &internalpb.MsgPosition{
MsgID: ids[i],
})
wg.Done()
}
wg.Wait()
finish.Wait()
m.close()
assert.EqualValues(t, size, counter.Load())
}
func TestFlushNotifyFunc(t *testing.T) {
// replica :=
// rcf := &RootCoordFactory{}