mirror of https://github.com/milvus-io/milvus.git
enhance: [Cherry-pick] make sure stream closed (#29458)
relate: https://github.com/milvus-io/milvus/issues/28367 pr:https://github.com/milvus-io/milvus/pull/29456 --------- Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>pull/29562/head
parent
2d517039b5
commit
5194240b08
|
@ -235,6 +235,8 @@ func (mgr *singleTypeChannelsMgr) createMsgStream(collectionID UniqueID) (msgstr
|
|||
zap.Strings("physical_channels", channelInfos.pchans))
|
||||
mgr.infos[collectionID] = streamInfos{channelInfos: channelInfos, stream: stream}
|
||||
incPChansMetrics(channelInfos.pchans)
|
||||
} else {
|
||||
stream.Close()
|
||||
}
|
||||
|
||||
return mgr.infos[collectionID].stream, nil
|
||||
|
|
|
@ -18,6 +18,7 @@ package proxy
|
|||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
|
@ -251,6 +252,43 @@ func Test_singleTypeChannelsMgr_createMsgStream(t *testing.T) {
|
|||
assert.NotNil(t, stream)
|
||||
})
|
||||
|
||||
t.Run("concurrent create", func(t *testing.T) {
|
||||
factory := newMockMsgStreamFactory()
|
||||
factory.f = func(ctx context.Context) (msgstream.MsgStream, error) {
|
||||
return newMockMsgStream(), nil
|
||||
}
|
||||
stopCh := make(chan struct{})
|
||||
readyCh := make(chan struct{})
|
||||
m := &singleTypeChannelsMgr{
|
||||
infos: make(map[UniqueID]streamInfos),
|
||||
getChannelsFunc: func(collectionID UniqueID) (channelInfos, error) {
|
||||
close(readyCh)
|
||||
<-stopCh
|
||||
return channelInfos{vchans: []string{"111", "222"}, pchans: []string{"111"}}, nil
|
||||
},
|
||||
msgStreamFactory: factory,
|
||||
repackFunc: nil,
|
||||
}
|
||||
|
||||
firstStream := streamInfos{stream: newMockMsgStream()}
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
stream, err := m.createMsgStream(100)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, stream)
|
||||
}()
|
||||
// make sure create msg stream has run at getchannels
|
||||
<-readyCh
|
||||
// mock create stream for same collection in same time.
|
||||
m.mu.Lock()
|
||||
m.infos[100] = firstStream
|
||||
m.mu.Unlock()
|
||||
|
||||
close(stopCh)
|
||||
wg.Wait()
|
||||
})
|
||||
t.Run("failed to get channels", func(t *testing.T) {
|
||||
m := &singleTypeChannelsMgr{
|
||||
getChannelsFunc: func(collectionID UniqueID) (channelInfos, error) {
|
||||
|
|
Loading…
Reference in New Issue