mirror of https://github.com/milvus-io/milvus.git
Refine RPC call in unwatch drop channel (#27864)
Signed-off-by: jaime <yun.zhang@zilliz.com>pull/27890/head
parent
323fc107a7
commit
6749957e71
|
@ -251,7 +251,7 @@ func (c *ChannelManager) unwatchDroppedChannels() {
|
|||
nodeChannels := c.store.GetChannels()
|
||||
for _, nodeChannel := range nodeChannels {
|
||||
for _, ch := range nodeChannel.Channels {
|
||||
if !c.h.CheckShouldDropChannel(ch.Name, ch.CollectionID) {
|
||||
if !c.h.CheckShouldDropChannel(ch.Name) {
|
||||
continue
|
||||
}
|
||||
err := c.remove(nodeChannel.NodeID, ch)
|
||||
|
@ -788,7 +788,7 @@ func (c *ChannelManager) Reassign(originNodeID UniqueID, channelName string) err
|
|||
c.mu.RUnlock()
|
||||
|
||||
reallocates := &NodeChannelInfo{originNodeID, []*channel{ch}}
|
||||
isDropped := c.isMarkedDrop(channelName, ch.CollectionID)
|
||||
isDropped := c.isMarkedDrop(channelName)
|
||||
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
@ -843,7 +843,7 @@ func (c *ChannelManager) CleanupAndReassign(nodeID UniqueID, channelName string)
|
|||
}
|
||||
|
||||
reallocates := &NodeChannelInfo{nodeID, []*channel{chToCleanUp}}
|
||||
isDropped := c.isMarkedDrop(channelName, chToCleanUp.CollectionID)
|
||||
isDropped := c.isMarkedDrop(channelName)
|
||||
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
@ -910,8 +910,8 @@ func (c *ChannelManager) getNodeIDByChannelName(chName string) (bool, UniqueID)
|
|||
return false, 0
|
||||
}
|
||||
|
||||
func (c *ChannelManager) isMarkedDrop(channelName string, collectionID UniqueID) bool {
|
||||
return c.h.CheckShouldDropChannel(channelName, collectionID)
|
||||
func (c *ChannelManager) isMarkedDrop(channelName string) bool {
|
||||
return c.h.CheckShouldDropChannel(channelName)
|
||||
}
|
||||
|
||||
func getReleaseOp(nodeID UniqueID, ch *channel) ChannelOpSet {
|
||||
|
|
|
@ -581,7 +581,7 @@ func TestChannelManager(t *testing.T) {
|
|||
collectionID := UniqueID(5)
|
||||
handler := NewNMockHandler(t)
|
||||
handler.EXPECT().
|
||||
CheckShouldDropChannel(mock.Anything, mock.Anything).
|
||||
CheckShouldDropChannel(mock.Anything).
|
||||
Return(true)
|
||||
handler.EXPECT().FinishDropChannel(mock.Anything).Return(nil)
|
||||
chManager, err := NewChannelManager(watchkv, handler)
|
||||
|
@ -603,8 +603,8 @@ func TestChannelManager(t *testing.T) {
|
|||
var err error
|
||||
handler := NewNMockHandler(t)
|
||||
handler.EXPECT().
|
||||
CheckShouldDropChannel(mock.Anything, mock.Anything).
|
||||
Run(func(channel string, collectionID int64) {
|
||||
CheckShouldDropChannel(mock.Anything).
|
||||
Run(func(channel string) {
|
||||
channels, err := chManager.store.Delete(1)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 1, len(channels))
|
||||
|
@ -628,8 +628,8 @@ func TestChannelManager(t *testing.T) {
|
|||
var err error
|
||||
handler := NewNMockHandler(t)
|
||||
handler.EXPECT().
|
||||
CheckShouldDropChannel(mock.Anything, mock.Anything).
|
||||
Run(func(channel string, collectionID int64) {
|
||||
CheckShouldDropChannel(mock.Anything).
|
||||
Run(func(channel string) {
|
||||
channels, err := chManager.store.Delete(1)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 1, len(channels))
|
||||
|
@ -659,7 +659,7 @@ func TestChannelManager(t *testing.T) {
|
|||
t.Run("test CleanupAndReassign with dropped channel", func(t *testing.T) {
|
||||
handler := NewNMockHandler(t)
|
||||
handler.EXPECT().
|
||||
CheckShouldDropChannel(mock.Anything, mock.Anything).
|
||||
CheckShouldDropChannel(mock.Anything).
|
||||
Return(true)
|
||||
handler.EXPECT().FinishDropChannel(mock.Anything).Return(nil)
|
||||
chManager, err := NewChannelManager(watchkv, handler)
|
||||
|
|
|
@ -39,8 +39,8 @@ type Handler interface {
|
|||
GetQueryVChanPositions(ch *channel, partitionIDs ...UniqueID) *datapb.VchannelInfo
|
||||
// GetDataVChanPositions gets the information recovery needed of a channel for DataNode
|
||||
GetDataVChanPositions(ch *channel, partitionID UniqueID) *datapb.VchannelInfo
|
||||
CheckShouldDropChannel(channel string, collectionID UniqueID) bool
|
||||
FinishDropChannel(channel string) error
|
||||
CheckShouldDropChannel(ch string) bool
|
||||
FinishDropChannel(ch string) error
|
||||
GetCollection(ctx context.Context, collectionID UniqueID) (*collectionInfo, error)
|
||||
}
|
||||
|
||||
|
@ -403,20 +403,8 @@ func (h *ServerHandler) GetCollection(ctx context.Context, collectionID UniqueID
|
|||
}
|
||||
|
||||
// CheckShouldDropChannel returns whether specified channel is marked to be removed
|
||||
func (h *ServerHandler) CheckShouldDropChannel(channel string, collectionID UniqueID) bool {
|
||||
if h.s.meta.catalog.ShouldDropChannel(h.s.ctx, channel) {
|
||||
return true
|
||||
}
|
||||
// collectionID parse from channelName
|
||||
has, err := h.HasCollection(h.s.ctx, collectionID)
|
||||
if err != nil {
|
||||
log.Info("datacoord ServerHandler CheckShouldDropChannel hasCollection failed", zap.Error(err))
|
||||
return false
|
||||
}
|
||||
log.Info("datacoord ServerHandler CheckShouldDropChannel hasCollection", zap.Bool("shouldDropChannel", !has),
|
||||
zap.String("channel", channel))
|
||||
|
||||
return !has
|
||||
func (h *ServerHandler) CheckShouldDropChannel(channel string) bool {
|
||||
return h.s.meta.catalog.ShouldDropChannel(h.s.ctx, channel)
|
||||
}
|
||||
|
||||
// FinishDropChannel cleans up the remove flag for channels
|
||||
|
|
|
@ -22,13 +22,13 @@ func (_m *NMockHandler) EXPECT() *NMockHandler_Expecter {
|
|||
return &NMockHandler_Expecter{mock: &_m.Mock}
|
||||
}
|
||||
|
||||
// CheckShouldDropChannel provides a mock function with given fields: channel, collectionID
|
||||
func (_m *NMockHandler) CheckShouldDropChannel(channel string, collectionID int64) bool {
|
||||
ret := _m.Called(channel, collectionID)
|
||||
// CheckShouldDropChannel provides a mock function with given fields: ch
|
||||
func (_m *NMockHandler) CheckShouldDropChannel(ch string) bool {
|
||||
ret := _m.Called(ch)
|
||||
|
||||
var r0 bool
|
||||
if rf, ok := ret.Get(0).(func(string, int64) bool); ok {
|
||||
r0 = rf(channel, collectionID)
|
||||
if rf, ok := ret.Get(0).(func(string) bool); ok {
|
||||
r0 = rf(ch)
|
||||
} else {
|
||||
r0 = ret.Get(0).(bool)
|
||||
}
|
||||
|
@ -42,15 +42,14 @@ type NMockHandler_CheckShouldDropChannel_Call struct {
|
|||
}
|
||||
|
||||
// CheckShouldDropChannel is a helper method to define mock.On call
|
||||
// - channel string
|
||||
// - collectionID int64
|
||||
func (_e *NMockHandler_Expecter) CheckShouldDropChannel(channel interface{}, collectionID interface{}) *NMockHandler_CheckShouldDropChannel_Call {
|
||||
return &NMockHandler_CheckShouldDropChannel_Call{Call: _e.mock.On("CheckShouldDropChannel", channel, collectionID)}
|
||||
// - ch string
|
||||
func (_e *NMockHandler_Expecter) CheckShouldDropChannel(ch interface{}) *NMockHandler_CheckShouldDropChannel_Call {
|
||||
return &NMockHandler_CheckShouldDropChannel_Call{Call: _e.mock.On("CheckShouldDropChannel", ch)}
|
||||
}
|
||||
|
||||
func (_c *NMockHandler_CheckShouldDropChannel_Call) Run(run func(channel string, collectionID int64)) *NMockHandler_CheckShouldDropChannel_Call {
|
||||
func (_c *NMockHandler_CheckShouldDropChannel_Call) Run(run func(ch string)) *NMockHandler_CheckShouldDropChannel_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(string), args[1].(int64))
|
||||
run(args[0].(string))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
@ -60,18 +59,18 @@ func (_c *NMockHandler_CheckShouldDropChannel_Call) Return(_a0 bool) *NMockHandl
|
|||
return _c
|
||||
}
|
||||
|
||||
func (_c *NMockHandler_CheckShouldDropChannel_Call) RunAndReturn(run func(string, int64) bool) *NMockHandler_CheckShouldDropChannel_Call {
|
||||
func (_c *NMockHandler_CheckShouldDropChannel_Call) RunAndReturn(run func(string) bool) *NMockHandler_CheckShouldDropChannel_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// FinishDropChannel provides a mock function with given fields: channel
|
||||
func (_m *NMockHandler) FinishDropChannel(channel string) error {
|
||||
ret := _m.Called(channel)
|
||||
// FinishDropChannel provides a mock function with given fields: ch
|
||||
func (_m *NMockHandler) FinishDropChannel(ch string) error {
|
||||
ret := _m.Called(ch)
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(string) error); ok {
|
||||
r0 = rf(channel)
|
||||
r0 = rf(ch)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
|
@ -85,12 +84,12 @@ type NMockHandler_FinishDropChannel_Call struct {
|
|||
}
|
||||
|
||||
// FinishDropChannel is a helper method to define mock.On call
|
||||
// - channel string
|
||||
func (_e *NMockHandler_Expecter) FinishDropChannel(channel interface{}) *NMockHandler_FinishDropChannel_Call {
|
||||
return &NMockHandler_FinishDropChannel_Call{Call: _e.mock.On("FinishDropChannel", channel)}
|
||||
// - ch string
|
||||
func (_e *NMockHandler_Expecter) FinishDropChannel(ch interface{}) *NMockHandler_FinishDropChannel_Call {
|
||||
return &NMockHandler_FinishDropChannel_Call{Call: _e.mock.On("FinishDropChannel", ch)}
|
||||
}
|
||||
|
||||
func (_c *NMockHandler_FinishDropChannel_Call) Run(run func(channel string)) *NMockHandler_FinishDropChannel_Call {
|
||||
func (_c *NMockHandler_FinishDropChannel_Call) Run(run func(ch string)) *NMockHandler_FinishDropChannel_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(string))
|
||||
})
|
||||
|
|
|
@ -784,7 +784,7 @@ func (h *mockHandler) GetDataVChanPositions(channel *channel, partitionID Unique
|
|||
}
|
||||
}
|
||||
|
||||
func (h *mockHandler) CheckShouldDropChannel(channel string, collectionID UniqueID) bool {
|
||||
func (h *mockHandler) CheckShouldDropChannel(channel string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
|
|
|
@ -26,7 +26,7 @@ import (
|
|||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/blang/semver/v4"
|
||||
semver "github.com/blang/semver/v4"
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/tikv/client-go/v2/txnkv"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
|
|
|
@ -1905,24 +1905,6 @@ func TestGetChannelSeekPosition(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestDescribeCollection(t *testing.T) {
|
||||
t.Run("TestNotExistCollections", func(t *testing.T) {
|
||||
svr := newTestServer(t, nil)
|
||||
defer closeTestServer(t, svr)
|
||||
has, err := svr.handler.(*ServerHandler).HasCollection(context.TODO(), -1)
|
||||
assert.NoError(t, err)
|
||||
assert.False(t, has)
|
||||
})
|
||||
|
||||
t.Run("TestExistCollections", func(t *testing.T) {
|
||||
svr := newTestServer(t, nil)
|
||||
defer closeTestServer(t, svr)
|
||||
has, err := svr.handler.(*ServerHandler).HasCollection(context.TODO(), 1314)
|
||||
assert.NoError(t, err)
|
||||
assert.True(t, has)
|
||||
})
|
||||
}
|
||||
|
||||
func TestGetDataVChanPositions(t *testing.T) {
|
||||
svr := newTestServer(t, nil)
|
||||
defer closeTestServer(t, svr)
|
||||
|
@ -2463,12 +2445,7 @@ func TestShouldDropChannel(t *testing.T) {
|
|||
Count: 1,
|
||||
}, nil)
|
||||
|
||||
var crt rootCoordCreatorFunc = func(ctx context.Context, metaRoot string, etcdClient *clientv3.Client) (types.RootCoordClient, error) {
|
||||
return myRoot, nil
|
||||
}
|
||||
|
||||
opt := WithRootCoordCreator(crt)
|
||||
svr := newTestServer(t, nil, opt)
|
||||
svr := newTestServer(t, nil)
|
||||
defer closeTestServer(t, svr)
|
||||
schema := newTestSchema()
|
||||
svr.meta.AddCollection(&collectionInfo{
|
||||
|
@ -2492,52 +2469,14 @@ func TestShouldDropChannel(t *testing.T) {
|
|||
},
|
||||
})
|
||||
|
||||
t.Run("channel name not in kv, collection not exist", func(t *testing.T) {
|
||||
// myRoot.code = commonpb.ErrorCode_CollectionNotExists
|
||||
myRoot.EXPECT().DescribeCollection(mock.Anything, mock.Anything).
|
||||
Return(&milvuspb.DescribeCollectionResponse{
|
||||
Status: merr.Status(merr.WrapErrCollectionNotFound(-1)),
|
||||
CollectionID: -1,
|
||||
}, nil).Once()
|
||||
assert.True(t, svr.handler.CheckShouldDropChannel("ch99", -1))
|
||||
t.Run("channel name not in kv ", func(t *testing.T) {
|
||||
assert.False(t, svr.handler.CheckShouldDropChannel("ch99"))
|
||||
})
|
||||
|
||||
t.Run("channel name not in kv, collection exist", func(t *testing.T) {
|
||||
myRoot.EXPECT().DescribeCollection(mock.Anything, mock.Anything).
|
||||
Return(&milvuspb.DescribeCollectionResponse{
|
||||
Status: merr.Success(),
|
||||
CollectionID: 0,
|
||||
}, nil).Once()
|
||||
assert.False(t, svr.handler.CheckShouldDropChannel("ch99", 0))
|
||||
})
|
||||
|
||||
t.Run("collection name in kv, collection exist", func(t *testing.T) {
|
||||
myRoot.EXPECT().DescribeCollection(mock.Anything, mock.Anything).
|
||||
Return(&milvuspb.DescribeCollectionResponse{
|
||||
Status: merr.Success(),
|
||||
CollectionID: 0,
|
||||
}, nil).Once()
|
||||
assert.False(t, svr.handler.CheckShouldDropChannel("ch1", 0))
|
||||
})
|
||||
|
||||
t.Run("collection name in kv, collection not exist", func(t *testing.T) {
|
||||
myRoot.EXPECT().DescribeCollection(mock.Anything, mock.Anything).
|
||||
Return(&milvuspb.DescribeCollectionResponse{
|
||||
Status: merr.Status(merr.WrapErrCollectionNotFound(-1)),
|
||||
CollectionID: -1,
|
||||
}, nil).Once()
|
||||
assert.True(t, svr.handler.CheckShouldDropChannel("ch1", -1))
|
||||
})
|
||||
|
||||
t.Run("channel in remove flag, collection exist", func(t *testing.T) {
|
||||
t.Run("channel in remove flag", func(t *testing.T) {
|
||||
err := svr.meta.catalog.MarkChannelDeleted(context.TODO(), "ch1")
|
||||
require.NoError(t, err)
|
||||
myRoot.EXPECT().DescribeCollection(mock.Anything, mock.Anything).
|
||||
Return(&milvuspb.DescribeCollectionResponse{
|
||||
Status: merr.Success(),
|
||||
CollectionID: 0,
|
||||
}, nil).Once()
|
||||
assert.True(t, svr.handler.CheckShouldDropChannel("ch1", 0))
|
||||
assert.True(t, svr.handler.CheckShouldDropChannel("ch1"))
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -53,7 +53,7 @@ type MockFactory_NewMsgStream_Call struct {
|
|||
}
|
||||
|
||||
// NewMsgStream is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
// - ctx context.Context
|
||||
func (_e *MockFactory_Expecter) NewMsgStream(ctx interface{}) *MockFactory_NewMsgStream_Call {
|
||||
return &MockFactory_NewMsgStream_Call{Call: _e.mock.On("NewMsgStream", ctx)}
|
||||
}
|
||||
|
@ -97,7 +97,7 @@ type MockFactory_NewMsgStreamDisposer_Call struct {
|
|||
}
|
||||
|
||||
// NewMsgStreamDisposer is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
// - ctx context.Context
|
||||
func (_e *MockFactory_Expecter) NewMsgStreamDisposer(ctx interface{}) *MockFactory_NewMsgStreamDisposer_Call {
|
||||
return &MockFactory_NewMsgStreamDisposer_Call{Call: _e.mock.On("NewMsgStreamDisposer", ctx)}
|
||||
}
|
||||
|
@ -151,7 +151,7 @@ type MockFactory_NewTtMsgStream_Call struct {
|
|||
}
|
||||
|
||||
// NewTtMsgStream is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
// - ctx context.Context
|
||||
func (_e *MockFactory_Expecter) NewTtMsgStream(ctx interface{}) *MockFactory_NewTtMsgStream_Call {
|
||||
return &MockFactory_NewTtMsgStream_Call{Call: _e.mock.On("NewTtMsgStream", ctx)}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue