From 72f454e0541a93c13733768c9cf0e0b58190dadb Mon Sep 17 00:00:00 2001 From: Jiquan Long Date: Tue, 31 May 2022 22:58:02 +0800 Subject: [PATCH] Fix 16961, make creating dml stream idempotent (#17302) Signed-off-by: longjiquan --- internal/proxy/channels_mgr.go | 10 ++++++++++ internal/proxy/channels_mgr_test.go | 4 ++++ 2 files changed, 14 insertions(+) diff --git a/internal/proxy/channels_mgr.go b/internal/proxy/channels_mgr.go index 233d17c345..048c604cf3 100644 --- a/internal/proxy/channels_mgr.go +++ b/internal/proxy/channels_mgr.go @@ -329,7 +329,17 @@ func (mgr *singleTypeChannelsMgr) getVChannels(collectionID UniqueID) ([]vChan, return nil, err } +func (mgr *singleTypeChannelsMgr) streamExist(collectionID UniqueID) bool { + stream, err := mgr.getStream(collectionID) + return err == nil && stream != nil +} + func (mgr *singleTypeChannelsMgr) createMsgStream(collectionID UniqueID) error { + if mgr.streamExist(collectionID) { + log.Info("stream already exist, no need to re-create", zap.Int64("collection_id", collectionID)) + return nil + } + channels, err := mgr.getChannelsFunc(collectionID) if err != nil { log.Warn("failed to create message stream", diff --git a/internal/proxy/channels_mgr_test.go b/internal/proxy/channels_mgr_test.go index e26a8fbd0b..a31b25a092 100644 --- a/internal/proxy/channels_mgr_test.go +++ b/internal/proxy/channels_mgr_test.go @@ -74,6 +74,10 @@ func TestChannelsMgrImpl_createDMLMsgStream(t *testing.T) { err = mgr.createDMLMsgStream(collID) assert.Equal(t, nil, err) + // re-create message stream. + err = mgr.createDMLMsgStream(collID) + assert.Equal(t, nil, err) + _, err = mgr.getChannels(collID) assert.Equal(t, nil, err) _, err = mgr.getVChannels(collID)