mirror of https://github.com/milvus-io/milvus.git
Add waitgroup for tsafe close logic (#8545)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/8556/head
parent
b2e5627061
commit
c72f074009
|
@ -70,6 +70,8 @@ type tSafe struct {
|
|||
watcherList []*tSafeWatcher
|
||||
tSafeChan chan tSafeMsg
|
||||
tSafeRecord map[UniqueID]Timestamp
|
||||
// waitgroup for closing control
|
||||
closeWg sync.WaitGroup
|
||||
}
|
||||
|
||||
func newTSafe(ctx context.Context, channel Channel) tSafer {
|
||||
|
@ -88,7 +90,9 @@ func newTSafe(ctx context.Context, channel Channel) tSafer {
|
|||
}
|
||||
|
||||
func (ts *tSafe) start() {
|
||||
ts.closeWg.Add(1)
|
||||
go func() {
|
||||
defer ts.closeWg.Done()
|
||||
for {
|
||||
select {
|
||||
case <-ts.ctx.Done():
|
||||
|
@ -164,6 +168,8 @@ func (ts *tSafe) close() {
|
|||
defer ts.tSafeMu.Unlock()
|
||||
|
||||
ts.cancel()
|
||||
// wait for all job done
|
||||
ts.closeWg.Wait()
|
||||
for _, watcher := range ts.watcherList {
|
||||
close(watcher.notifyChan)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue