From fbc8fb3cb2967de6e460dbc87b33792447f33f3a Mon Sep 17 00:00:00 2001 From: wei liu Date: Fri, 21 Jun 2024 10:24:12 +0800 Subject: [PATCH] enhance: Skip return data distribution if no change happen (#32814) (#33985) issue: #32813 pr: #32814 --------- Signed-off-by: Wei Liu --- Makefile | 1 + internal/proto/query_coord.proto | 2 + .../querycoordv2/dist/dist_controller_test.go | 25 +- internal/querycoordv2/dist/dist_handler.go | 66 +- .../querycoordv2/dist/dist_handler_test.go | 127 +++ .../querycoordv2/meta/mock_target_manager.go | 975 ++++++++++++++++++ internal/querycoordv2/meta/target_manager.go | 23 + internal/querynodev2/server.go | 4 + internal/querynodev2/services.go | 49 +- 9 files changed, 1231 insertions(+), 41 deletions(-) create mode 100644 internal/querycoordv2/dist/dist_handler_test.go create mode 100644 internal/querycoordv2/meta/mock_target_manager.go diff --git a/Makefile b/Makefile index c1bfc9e901..2bb8ca7728 100644 --- a/Makefile +++ b/Makefile @@ -429,6 +429,7 @@ generate-mockery-proxy: getdeps generate-mockery-querycoord: getdeps $(INSTALL_PATH)/mockery --name=QueryNodeServer --dir=$(PWD)/internal/proto/querypb/ --output=$(PWD)/internal/querycoordv2/mocks --filename=mock_querynode.go --with-expecter --structname=MockQueryNodeServer $(INSTALL_PATH)/mockery --name=Broker --dir=$(PWD)/internal/querycoordv2/meta --output=$(PWD)/internal/querycoordv2/meta --filename=mock_broker.go --with-expecter --structname=MockBroker --outpkg=meta + $(INSTALL_PATH)/mockery --name=TargetManagerInterface --dir=$(PWD)/internal/querycoordv2/meta --output=$(PWD)/internal/querycoordv2/meta --filename=mock_target_manager.go --with-expecter --structname=MockTargetManager --inpackage $(INSTALL_PATH)/mockery --name=Scheduler --dir=$(PWD)/internal/querycoordv2/task --output=$(PWD)/internal/querycoordv2/task --filename=mock_scheduler.go --with-expecter --structname=MockScheduler --outpkg=task --inpackage $(INSTALL_PATH)/mockery --name=Cluster --dir=$(PWD)/internal/querycoordv2/session --output=$(PWD)/internal/querycoordv2/session --filename=mock_cluster.go --with-expecter --structname=MockCluster --outpkg=session --inpackage $(INSTALL_PATH)/mockery --name=Balance --dir=$(PWD)/internal/querycoordv2/balance --output=$(PWD)/internal/querycoordv2/balance --filename=mock_balancer.go --with-expecter --structname=MockBalancer --outpkg=balance --inpackage diff --git a/internal/proto/query_coord.proto b/internal/proto/query_coord.proto index 2f5facab9d..808c94d912 100644 --- a/internal/proto/query_coord.proto +++ b/internal/proto/query_coord.proto @@ -589,6 +589,7 @@ message SealedSegmentsChangeInfo { message GetDataDistributionRequest { common.MsgBase base = 1; map checkpoints = 2; + int64 lastUpdateTs = 3; } message GetDataDistributionResponse { @@ -597,6 +598,7 @@ message GetDataDistributionResponse { repeated SegmentVersionInfo segments = 3; repeated ChannelVersionInfo channels = 4; repeated LeaderView leader_views = 5; + int64 lastModifyTs = 6; } message LeaderView { diff --git a/internal/querycoordv2/dist/dist_controller_test.go b/internal/querycoordv2/dist/dist_controller_test.go index d0ee50fad5..8ecaa0e410 100644 --- a/internal/querycoordv2/dist/dist_controller_test.go +++ b/internal/querycoordv2/dist/dist_controller_test.go @@ -48,6 +48,8 @@ type DistControllerTestSuite struct { kv kv.MetaKv meta *meta.Meta broker *meta.MockBroker + + nodeMgr *session.NodeManager } func (suite *DistControllerTestSuite) SetupTest() { @@ -69,16 +71,17 @@ func (suite *DistControllerTestSuite) SetupTest() { // meta store := querycoord.NewCatalog(suite.kv) idAllocator := RandomIncrementIDAllocator() - suite.meta = meta.NewMeta(idAllocator, store, session.NewNodeManager()) + + suite.nodeMgr = session.NewNodeManager() + suite.meta = meta.NewMeta(idAllocator, store, suite.nodeMgr) suite.mockCluster = session.NewMockCluster(suite.T()) - nodeManager := session.NewNodeManager() distManager := meta.NewDistributionManager() suite.broker = meta.NewMockBroker(suite.T()) targetManager := meta.NewTargetManager(suite.broker, suite.meta) suite.mockScheduler = task.NewMockScheduler(suite.T()) suite.mockScheduler.EXPECT().GetExecutedFlag(mock.Anything).Return(nil).Maybe() - suite.controller = NewDistController(suite.mockCluster, nodeManager, distManager, targetManager, suite.mockScheduler) + suite.controller = NewDistController(suite.mockCluster, suite.nodeMgr, distManager, targetManager, suite.mockScheduler) } func (suite *DistControllerTestSuite) TearDownSuite() { @@ -86,6 +89,11 @@ func (suite *DistControllerTestSuite) TearDownSuite() { } func (suite *DistControllerTestSuite) TestStart() { + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1, + Address: "localhost", + Hostname: "localhost", + })) dispatchCalled := atomic.NewBool(false) suite.mockCluster.EXPECT().GetDataDistribution(mock.Anything, mock.Anything, mock.Anything).Return( &querypb.GetDataDistributionResponse{Status: merr.Success(), NodeID: 1}, @@ -134,6 +142,17 @@ func (suite *DistControllerTestSuite) TestStop() { } func (suite *DistControllerTestSuite) TestSyncAll() { + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1, + Address: "localhost", + Hostname: "localhost", + })) + + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 2, + Address: "localhost", + Hostname: "localhost", + })) suite.controller.StartDistInstance(context.TODO(), 1) suite.controller.StartDistInstance(context.TODO(), 2) diff --git a/internal/querycoordv2/dist/dist_handler.go b/internal/querycoordv2/dist/dist_handler.go index 1729186718..4a5e8f92b8 100644 --- a/internal/querycoordv2/dist/dist_handler.go +++ b/internal/querycoordv2/dist/dist_handler.go @@ -26,7 +26,6 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/meta" @@ -40,16 +39,17 @@ import ( ) type distHandler struct { - nodeID int64 - c chan struct{} - wg sync.WaitGroup - client session.Cluster - nodeManager *session.NodeManager - scheduler task.Scheduler - dist *meta.DistributionManager - target *meta.TargetManager - mu sync.Mutex - stopOnce sync.Once + nodeID int64 + c chan struct{} + wg sync.WaitGroup + client session.Cluster + nodeManager *session.NodeManager + scheduler task.Scheduler + dist *meta.DistributionManager + target meta.TargetManagerInterface + mu sync.Mutex + stopOnce sync.Once + lastUpdateTs int64 } func (dh *distHandler) start(ctx context.Context) { @@ -103,21 +103,31 @@ func (dh *distHandler) pullDist(ctx context.Context, failures *int, dispatchTask func (dh *distHandler) handleDistResp(resp *querypb.GetDataDistributionResponse, dispatchTask bool) { node := dh.nodeManager.Get(resp.GetNodeID()) - if node != nil { + if node == nil { + return + } + + if time.Since(node.LastHeartbeat()) > paramtable.Get().QueryCoordCfg.HeartBeatWarningLag.GetAsDuration(time.Millisecond) { + log.Warn("node last heart beat time lag too behind", zap.Time("now", time.Now()), + zap.Time("lastHeartBeatTime", node.LastHeartbeat()), zap.Int64("nodeID", node.ID())) + } + node.SetLastHeartbeat(time.Now()) + + // skip update dist if no distribution change happens in query node + if resp.GetLastModifyTs() != 0 && resp.GetLastModifyTs() <= dh.lastUpdateTs { + log.RatedInfo(30, "skip update dist due to no distribution change", zap.Int64("lastModifyTs", resp.GetLastModifyTs()), zap.Int64("lastUpdateTs", dh.lastUpdateTs)) + } else { + dh.lastUpdateTs = resp.GetLastModifyTs() + node.UpdateStats( session.WithSegmentCnt(len(resp.GetSegments())), session.WithChannelCnt(len(resp.GetChannels())), ) - if time.Since(node.LastHeartbeat()) > paramtable.Get().QueryCoordCfg.HeartBeatWarningLag.GetAsDuration(time.Millisecond) { - log.Warn("node last heart beat time lag too behind", zap.Time("now", time.Now()), - zap.Time("lastHeartBeatTime", node.LastHeartbeat()), zap.Int64("nodeID", node.ID())) - } - node.SetLastHeartbeat(time.Now()) - } - dh.updateSegmentsDistribution(resp) - dh.updateChannelsDistribution(resp) - dh.updateLeaderView(resp) + dh.updateSegmentsDistribution(resp) + dh.updateChannelsDistribution(resp) + dh.updateLeaderView(resp) + } if dispatchTask { dh.scheduler.Dispatch(dh.nodeID) @@ -232,23 +242,13 @@ func (dh *distHandler) getDistribution(ctx context.Context) (*querypb.GetDataDis dh.mu.Lock() defer dh.mu.Unlock() - channels := make(map[string]*msgpb.MsgPosition) - for _, channel := range dh.dist.ChannelDistManager.GetByFilter(meta.WithNodeID2Channel(dh.nodeID)) { - targetChannel := dh.target.GetDmChannel(channel.GetCollectionID(), channel.GetChannelName(), meta.CurrentTarget) - if targetChannel == nil { - continue - } - - channels[channel.GetChannelName()] = targetChannel.GetSeekPosition() - } - ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.DistributionRequestTimeout.GetAsDuration(time.Millisecond)) defer cancel() resp, err := dh.client.GetDataDistribution(ctx, dh.nodeID, &querypb.GetDataDistributionRequest{ Base: commonpbutil.NewMsgBase( commonpbutil.WithMsgType(commonpb.MsgType_GetDistribution), ), - Checkpoints: channels, + LastUpdateTs: dh.lastUpdateTs, }) if err != nil { return nil, err @@ -277,7 +277,7 @@ func newDistHandler( nodeManager *session.NodeManager, scheduler task.Scheduler, dist *meta.DistributionManager, - targetMgr *meta.TargetManager, + targetMgr meta.TargetManagerInterface, ) *distHandler { h := &distHandler{ nodeID: nodeID, diff --git a/internal/querycoordv2/dist/dist_handler_test.go b/internal/querycoordv2/dist/dist_handler_test.go new file mode 100644 index 0000000000..99b2ad47b6 --- /dev/null +++ b/internal/querycoordv2/dist/dist_handler_test.go @@ -0,0 +1,127 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dist + +import ( + "context" + "testing" + "time" + + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + + "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/querycoordv2/meta" + "github.com/milvus-io/milvus/internal/querycoordv2/session" + "github.com/milvus-io/milvus/internal/querycoordv2/task" + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" +) + +type DistHandlerSuite struct { + suite.Suite + + ctx context.Context + meta *meta.Meta + broker *meta.MockBroker + + nodeID int64 + client *session.MockCluster + nodeManager *session.NodeManager + scheduler *task.MockScheduler + dist *meta.DistributionManager + target *meta.MockTargetManager + + handler *distHandler +} + +func (suite *DistHandlerSuite) SetupSuite() { + paramtable.Init() + suite.nodeID = 1 + suite.client = session.NewMockCluster(suite.T()) + suite.nodeManager = session.NewNodeManager() + suite.scheduler = task.NewMockScheduler(suite.T()) + suite.dist = meta.NewDistributionManager() + + suite.target = meta.NewMockTargetManager(suite.T()) + suite.ctx = context.Background() + + suite.scheduler.EXPECT().Dispatch(mock.Anything).Maybe() + suite.scheduler.EXPECT().GetExecutedFlag(mock.Anything).Return(nil).Maybe() + suite.target.EXPECT().GetSealedSegment(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + suite.target.EXPECT().GetDmChannel(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() +} + +func (suite *DistHandlerSuite) TestBasic() { + suite.nodeManager.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1, + Address: "localhost", + Hostname: "localhost", + })) + suite.client.EXPECT().GetDataDistribution(mock.Anything, mock.Anything, mock.Anything).Return(&querypb.GetDataDistributionResponse{ + Status: merr.Success(), + NodeID: 1, + Channels: []*querypb.ChannelVersionInfo{ + { + Channel: "test-channel-1", + Collection: 1, + Version: 1, + }, + }, + Segments: []*querypb.SegmentVersionInfo{ + { + ID: 1, + Collection: 1, + Partition: 1, + Channel: "test-channel-1", + Version: 1, + }, + }, + + LeaderViews: []*querypb.LeaderView{ + { + Collection: 1, + Channel: "test-channel-1", + }, + }, + LastModifyTs: 1, + }, nil) + + suite.handler = newDistHandler(suite.ctx, suite.nodeID, suite.client, suite.nodeManager, suite.scheduler, suite.dist, suite.target) + defer suite.handler.stop() + + time.Sleep(10 * time.Second) +} + +func (suite *DistHandlerSuite) TestGetDistributionFailed() { + suite.nodeManager.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1, + Address: "localhost", + Hostname: "localhost", + })) + suite.client.EXPECT().GetDataDistribution(mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("fake error")) + + suite.handler = newDistHandler(suite.ctx, suite.nodeID, suite.client, suite.nodeManager, suite.scheduler, suite.dist, suite.target) + defer suite.handler.stop() + + time.Sleep(10 * time.Second) +} + +func TestDistHandlerSuite(t *testing.T) { + suite.Run(t, new(DistHandlerSuite)) +} diff --git a/internal/querycoordv2/meta/mock_target_manager.go b/internal/querycoordv2/meta/mock_target_manager.go new file mode 100644 index 0000000000..5728fd2903 --- /dev/null +++ b/internal/querycoordv2/meta/mock_target_manager.go @@ -0,0 +1,975 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package meta + +import ( + metastore "github.com/milvus-io/milvus/internal/metastore" + datapb "github.com/milvus-io/milvus/internal/proto/datapb" + + mock "github.com/stretchr/testify/mock" + + typeutil "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +// MockTargetManager is an autogenerated mock type for the TargetManagerInterface type +type MockTargetManager struct { + mock.Mock +} + +type MockTargetManager_Expecter struct { + mock *mock.Mock +} + +func (_m *MockTargetManager) EXPECT() *MockTargetManager_Expecter { + return &MockTargetManager_Expecter{mock: &_m.Mock} +} + +// GetCollectionTargetVersion provides a mock function with given fields: collectionID, scope +func (_m *MockTargetManager) GetCollectionTargetVersion(collectionID int64, scope int32) int64 { + ret := _m.Called(collectionID, scope) + + var r0 int64 + if rf, ok := ret.Get(0).(func(int64, int32) int64); ok { + r0 = rf(collectionID, scope) + } else { + r0 = ret.Get(0).(int64) + } + + return r0 +} + +// MockTargetManager_GetCollectionTargetVersion_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetCollectionTargetVersion' +type MockTargetManager_GetCollectionTargetVersion_Call struct { + *mock.Call +} + +// GetCollectionTargetVersion is a helper method to define mock.On call +// - collectionID int64 +// - scope int32 +func (_e *MockTargetManager_Expecter) GetCollectionTargetVersion(collectionID interface{}, scope interface{}) *MockTargetManager_GetCollectionTargetVersion_Call { + return &MockTargetManager_GetCollectionTargetVersion_Call{Call: _e.mock.On("GetCollectionTargetVersion", collectionID, scope)} +} + +func (_c *MockTargetManager_GetCollectionTargetVersion_Call) Run(run func(collectionID int64, scope int32)) *MockTargetManager_GetCollectionTargetVersion_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64), args[1].(int32)) + }) + return _c +} + +func (_c *MockTargetManager_GetCollectionTargetVersion_Call) Return(_a0 int64) *MockTargetManager_GetCollectionTargetVersion_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockTargetManager_GetCollectionTargetVersion_Call) RunAndReturn(run func(int64, int32) int64) *MockTargetManager_GetCollectionTargetVersion_Call { + _c.Call.Return(run) + return _c +} + +// GetDmChannel provides a mock function with given fields: collectionID, channel, scope +func (_m *MockTargetManager) GetDmChannel(collectionID int64, channel string, scope int32) *DmChannel { + ret := _m.Called(collectionID, channel, scope) + + var r0 *DmChannel + if rf, ok := ret.Get(0).(func(int64, string, int32) *DmChannel); ok { + r0 = rf(collectionID, channel, scope) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*DmChannel) + } + } + + return r0 +} + +// MockTargetManager_GetDmChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetDmChannel' +type MockTargetManager_GetDmChannel_Call struct { + *mock.Call +} + +// GetDmChannel is a helper method to define mock.On call +// - collectionID int64 +// - channel string +// - scope int32 +func (_e *MockTargetManager_Expecter) GetDmChannel(collectionID interface{}, channel interface{}, scope interface{}) *MockTargetManager_GetDmChannel_Call { + return &MockTargetManager_GetDmChannel_Call{Call: _e.mock.On("GetDmChannel", collectionID, channel, scope)} +} + +func (_c *MockTargetManager_GetDmChannel_Call) Run(run func(collectionID int64, channel string, scope int32)) *MockTargetManager_GetDmChannel_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64), args[1].(string), args[2].(int32)) + }) + return _c +} + +func (_c *MockTargetManager_GetDmChannel_Call) Return(_a0 *DmChannel) *MockTargetManager_GetDmChannel_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockTargetManager_GetDmChannel_Call) RunAndReturn(run func(int64, string, int32) *DmChannel) *MockTargetManager_GetDmChannel_Call { + _c.Call.Return(run) + return _c +} + +// GetDmChannelsByCollection provides a mock function with given fields: collectionID, scope +func (_m *MockTargetManager) GetDmChannelsByCollection(collectionID int64, scope int32) map[string]*DmChannel { + ret := _m.Called(collectionID, scope) + + var r0 map[string]*DmChannel + if rf, ok := ret.Get(0).(func(int64, int32) map[string]*DmChannel); ok { + r0 = rf(collectionID, scope) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[string]*DmChannel) + } + } + + return r0 +} + +// MockTargetManager_GetDmChannelsByCollection_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetDmChannelsByCollection' +type MockTargetManager_GetDmChannelsByCollection_Call struct { + *mock.Call +} + +// GetDmChannelsByCollection is a helper method to define mock.On call +// - collectionID int64 +// - scope int32 +func (_e *MockTargetManager_Expecter) GetDmChannelsByCollection(collectionID interface{}, scope interface{}) *MockTargetManager_GetDmChannelsByCollection_Call { + return &MockTargetManager_GetDmChannelsByCollection_Call{Call: _e.mock.On("GetDmChannelsByCollection", collectionID, scope)} +} + +func (_c *MockTargetManager_GetDmChannelsByCollection_Call) Run(run func(collectionID int64, scope int32)) *MockTargetManager_GetDmChannelsByCollection_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64), args[1].(int32)) + }) + return _c +} + +func (_c *MockTargetManager_GetDmChannelsByCollection_Call) Return(_a0 map[string]*DmChannel) *MockTargetManager_GetDmChannelsByCollection_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockTargetManager_GetDmChannelsByCollection_Call) RunAndReturn(run func(int64, int32) map[string]*DmChannel) *MockTargetManager_GetDmChannelsByCollection_Call { + _c.Call.Return(run) + return _c +} + +// GetDroppedSegmentsByChannel provides a mock function with given fields: collectionID, channelName, scope +func (_m *MockTargetManager) GetDroppedSegmentsByChannel(collectionID int64, channelName string, scope int32) []int64 { + ret := _m.Called(collectionID, channelName, scope) + + var r0 []int64 + if rf, ok := ret.Get(0).(func(int64, string, int32) []int64); ok { + r0 = rf(collectionID, channelName, scope) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]int64) + } + } + + return r0 +} + +// MockTargetManager_GetDroppedSegmentsByChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetDroppedSegmentsByChannel' +type MockTargetManager_GetDroppedSegmentsByChannel_Call struct { + *mock.Call +} + +// GetDroppedSegmentsByChannel is a helper method to define mock.On call +// - collectionID int64 +// - channelName string +// - scope int32 +func (_e *MockTargetManager_Expecter) GetDroppedSegmentsByChannel(collectionID interface{}, channelName interface{}, scope interface{}) *MockTargetManager_GetDroppedSegmentsByChannel_Call { + return &MockTargetManager_GetDroppedSegmentsByChannel_Call{Call: _e.mock.On("GetDroppedSegmentsByChannel", collectionID, channelName, scope)} +} + +func (_c *MockTargetManager_GetDroppedSegmentsByChannel_Call) Run(run func(collectionID int64, channelName string, scope int32)) *MockTargetManager_GetDroppedSegmentsByChannel_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64), args[1].(string), args[2].(int32)) + }) + return _c +} + +func (_c *MockTargetManager_GetDroppedSegmentsByChannel_Call) Return(_a0 []int64) *MockTargetManager_GetDroppedSegmentsByChannel_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockTargetManager_GetDroppedSegmentsByChannel_Call) RunAndReturn(run func(int64, string, int32) []int64) *MockTargetManager_GetDroppedSegmentsByChannel_Call { + _c.Call.Return(run) + return _c +} + +// GetGrowingSegmentsByChannel provides a mock function with given fields: collectionID, channelName, scope +func (_m *MockTargetManager) GetGrowingSegmentsByChannel(collectionID int64, channelName string, scope int32) typeutil.Set[int64] { + ret := _m.Called(collectionID, channelName, scope) + + var r0 typeutil.Set[int64] + if rf, ok := ret.Get(0).(func(int64, string, int32) typeutil.Set[int64]); ok { + r0 = rf(collectionID, channelName, scope) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(typeutil.Set[int64]) + } + } + + return r0 +} + +// MockTargetManager_GetGrowingSegmentsByChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetGrowingSegmentsByChannel' +type MockTargetManager_GetGrowingSegmentsByChannel_Call struct { + *mock.Call +} + +// GetGrowingSegmentsByChannel is a helper method to define mock.On call +// - collectionID int64 +// - channelName string +// - scope int32 +func (_e *MockTargetManager_Expecter) GetGrowingSegmentsByChannel(collectionID interface{}, channelName interface{}, scope interface{}) *MockTargetManager_GetGrowingSegmentsByChannel_Call { + return &MockTargetManager_GetGrowingSegmentsByChannel_Call{Call: _e.mock.On("GetGrowingSegmentsByChannel", collectionID, channelName, scope)} +} + +func (_c *MockTargetManager_GetGrowingSegmentsByChannel_Call) Run(run func(collectionID int64, channelName string, scope int32)) *MockTargetManager_GetGrowingSegmentsByChannel_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64), args[1].(string), args[2].(int32)) + }) + return _c +} + +func (_c *MockTargetManager_GetGrowingSegmentsByChannel_Call) Return(_a0 typeutil.Set[int64]) *MockTargetManager_GetGrowingSegmentsByChannel_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockTargetManager_GetGrowingSegmentsByChannel_Call) RunAndReturn(run func(int64, string, int32) typeutil.Set[int64]) *MockTargetManager_GetGrowingSegmentsByChannel_Call { + _c.Call.Return(run) + return _c +} + +// GetGrowingSegmentsByCollection provides a mock function with given fields: collectionID, scope +func (_m *MockTargetManager) GetGrowingSegmentsByCollection(collectionID int64, scope int32) typeutil.Set[int64] { + ret := _m.Called(collectionID, scope) + + var r0 typeutil.Set[int64] + if rf, ok := ret.Get(0).(func(int64, int32) typeutil.Set[int64]); ok { + r0 = rf(collectionID, scope) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(typeutil.Set[int64]) + } + } + + return r0 +} + +// MockTargetManager_GetGrowingSegmentsByCollection_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetGrowingSegmentsByCollection' +type MockTargetManager_GetGrowingSegmentsByCollection_Call struct { + *mock.Call +} + +// GetGrowingSegmentsByCollection is a helper method to define mock.On call +// - collectionID int64 +// - scope int32 +func (_e *MockTargetManager_Expecter) GetGrowingSegmentsByCollection(collectionID interface{}, scope interface{}) *MockTargetManager_GetGrowingSegmentsByCollection_Call { + return &MockTargetManager_GetGrowingSegmentsByCollection_Call{Call: _e.mock.On("GetGrowingSegmentsByCollection", collectionID, scope)} +} + +func (_c *MockTargetManager_GetGrowingSegmentsByCollection_Call) Run(run func(collectionID int64, scope int32)) *MockTargetManager_GetGrowingSegmentsByCollection_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64), args[1].(int32)) + }) + return _c +} + +func (_c *MockTargetManager_GetGrowingSegmentsByCollection_Call) Return(_a0 typeutil.Set[int64]) *MockTargetManager_GetGrowingSegmentsByCollection_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockTargetManager_GetGrowingSegmentsByCollection_Call) RunAndReturn(run func(int64, int32) typeutil.Set[int64]) *MockTargetManager_GetGrowingSegmentsByCollection_Call { + _c.Call.Return(run) + return _c +} + +// GetSealedSegment provides a mock function with given fields: collectionID, id, scope +func (_m *MockTargetManager) GetSealedSegment(collectionID int64, id int64, scope int32) *datapb.SegmentInfo { + ret := _m.Called(collectionID, id, scope) + + var r0 *datapb.SegmentInfo + if rf, ok := ret.Get(0).(func(int64, int64, int32) *datapb.SegmentInfo); ok { + r0 = rf(collectionID, id, scope) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*datapb.SegmentInfo) + } + } + + return r0 +} + +// MockTargetManager_GetSealedSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSealedSegment' +type MockTargetManager_GetSealedSegment_Call struct { + *mock.Call +} + +// GetSealedSegment is a helper method to define mock.On call +// - collectionID int64 +// - id int64 +// - scope int32 +func (_e *MockTargetManager_Expecter) GetSealedSegment(collectionID interface{}, id interface{}, scope interface{}) *MockTargetManager_GetSealedSegment_Call { + return &MockTargetManager_GetSealedSegment_Call{Call: _e.mock.On("GetSealedSegment", collectionID, id, scope)} +} + +func (_c *MockTargetManager_GetSealedSegment_Call) Run(run func(collectionID int64, id int64, scope int32)) *MockTargetManager_GetSealedSegment_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64), args[1].(int64), args[2].(int32)) + }) + return _c +} + +func (_c *MockTargetManager_GetSealedSegment_Call) Return(_a0 *datapb.SegmentInfo) *MockTargetManager_GetSealedSegment_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockTargetManager_GetSealedSegment_Call) RunAndReturn(run func(int64, int64, int32) *datapb.SegmentInfo) *MockTargetManager_GetSealedSegment_Call { + _c.Call.Return(run) + return _c +} + +// GetSealedSegmentsByChannel provides a mock function with given fields: collectionID, channelName, scope +func (_m *MockTargetManager) GetSealedSegmentsByChannel(collectionID int64, channelName string, scope int32) map[int64]*datapb.SegmentInfo { + ret := _m.Called(collectionID, channelName, scope) + + var r0 map[int64]*datapb.SegmentInfo + if rf, ok := ret.Get(0).(func(int64, string, int32) map[int64]*datapb.SegmentInfo); ok { + r0 = rf(collectionID, channelName, scope) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[int64]*datapb.SegmentInfo) + } + } + + return r0 +} + +// MockTargetManager_GetSealedSegmentsByChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSealedSegmentsByChannel' +type MockTargetManager_GetSealedSegmentsByChannel_Call struct { + *mock.Call +} + +// GetSealedSegmentsByChannel is a helper method to define mock.On call +// - collectionID int64 +// - channelName string +// - scope int32 +func (_e *MockTargetManager_Expecter) GetSealedSegmentsByChannel(collectionID interface{}, channelName interface{}, scope interface{}) *MockTargetManager_GetSealedSegmentsByChannel_Call { + return &MockTargetManager_GetSealedSegmentsByChannel_Call{Call: _e.mock.On("GetSealedSegmentsByChannel", collectionID, channelName, scope)} +} + +func (_c *MockTargetManager_GetSealedSegmentsByChannel_Call) Run(run func(collectionID int64, channelName string, scope int32)) *MockTargetManager_GetSealedSegmentsByChannel_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64), args[1].(string), args[2].(int32)) + }) + return _c +} + +func (_c *MockTargetManager_GetSealedSegmentsByChannel_Call) Return(_a0 map[int64]*datapb.SegmentInfo) *MockTargetManager_GetSealedSegmentsByChannel_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockTargetManager_GetSealedSegmentsByChannel_Call) RunAndReturn(run func(int64, string, int32) map[int64]*datapb.SegmentInfo) *MockTargetManager_GetSealedSegmentsByChannel_Call { + _c.Call.Return(run) + return _c +} + +// GetSealedSegmentsByCollection provides a mock function with given fields: collectionID, scope +func (_m *MockTargetManager) GetSealedSegmentsByCollection(collectionID int64, scope int32) map[int64]*datapb.SegmentInfo { + ret := _m.Called(collectionID, scope) + + var r0 map[int64]*datapb.SegmentInfo + if rf, ok := ret.Get(0).(func(int64, int32) map[int64]*datapb.SegmentInfo); ok { + r0 = rf(collectionID, scope) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[int64]*datapb.SegmentInfo) + } + } + + return r0 +} + +// MockTargetManager_GetSealedSegmentsByCollection_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSealedSegmentsByCollection' +type MockTargetManager_GetSealedSegmentsByCollection_Call struct { + *mock.Call +} + +// GetSealedSegmentsByCollection is a helper method to define mock.On call +// - collectionID int64 +// - scope int32 +func (_e *MockTargetManager_Expecter) GetSealedSegmentsByCollection(collectionID interface{}, scope interface{}) *MockTargetManager_GetSealedSegmentsByCollection_Call { + return &MockTargetManager_GetSealedSegmentsByCollection_Call{Call: _e.mock.On("GetSealedSegmentsByCollection", collectionID, scope)} +} + +func (_c *MockTargetManager_GetSealedSegmentsByCollection_Call) Run(run func(collectionID int64, scope int32)) *MockTargetManager_GetSealedSegmentsByCollection_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64), args[1].(int32)) + }) + return _c +} + +func (_c *MockTargetManager_GetSealedSegmentsByCollection_Call) Return(_a0 map[int64]*datapb.SegmentInfo) *MockTargetManager_GetSealedSegmentsByCollection_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockTargetManager_GetSealedSegmentsByCollection_Call) RunAndReturn(run func(int64, int32) map[int64]*datapb.SegmentInfo) *MockTargetManager_GetSealedSegmentsByCollection_Call { + _c.Call.Return(run) + return _c +} + +// GetSealedSegmentsByPartition provides a mock function with given fields: collectionID, partitionID, scope +func (_m *MockTargetManager) GetSealedSegmentsByPartition(collectionID int64, partitionID int64, scope int32) map[int64]*datapb.SegmentInfo { + ret := _m.Called(collectionID, partitionID, scope) + + var r0 map[int64]*datapb.SegmentInfo + if rf, ok := ret.Get(0).(func(int64, int64, int32) map[int64]*datapb.SegmentInfo); ok { + r0 = rf(collectionID, partitionID, scope) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[int64]*datapb.SegmentInfo) + } + } + + return r0 +} + +// MockTargetManager_GetSealedSegmentsByPartition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSealedSegmentsByPartition' +type MockTargetManager_GetSealedSegmentsByPartition_Call struct { + *mock.Call +} + +// GetSealedSegmentsByPartition is a helper method to define mock.On call +// - collectionID int64 +// - partitionID int64 +// - scope int32 +func (_e *MockTargetManager_Expecter) GetSealedSegmentsByPartition(collectionID interface{}, partitionID interface{}, scope interface{}) *MockTargetManager_GetSealedSegmentsByPartition_Call { + return &MockTargetManager_GetSealedSegmentsByPartition_Call{Call: _e.mock.On("GetSealedSegmentsByPartition", collectionID, partitionID, scope)} +} + +func (_c *MockTargetManager_GetSealedSegmentsByPartition_Call) Run(run func(collectionID int64, partitionID int64, scope int32)) *MockTargetManager_GetSealedSegmentsByPartition_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64), args[1].(int64), args[2].(int32)) + }) + return _c +} + +func (_c *MockTargetManager_GetSealedSegmentsByPartition_Call) Return(_a0 map[int64]*datapb.SegmentInfo) *MockTargetManager_GetSealedSegmentsByPartition_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockTargetManager_GetSealedSegmentsByPartition_Call) RunAndReturn(run func(int64, int64, int32) map[int64]*datapb.SegmentInfo) *MockTargetManager_GetSealedSegmentsByPartition_Call { + _c.Call.Return(run) + return _c +} + +// IsCurrentTargetExist provides a mock function with given fields: collectionID +func (_m *MockTargetManager) IsCurrentTargetExist(collectionID int64) bool { + ret := _m.Called(collectionID) + + var r0 bool + if rf, ok := ret.Get(0).(func(int64) bool); ok { + r0 = rf(collectionID) + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// MockTargetManager_IsCurrentTargetExist_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsCurrentTargetExist' +type MockTargetManager_IsCurrentTargetExist_Call struct { + *mock.Call +} + +// IsCurrentTargetExist is a helper method to define mock.On call +// - collectionID int64 +func (_e *MockTargetManager_Expecter) IsCurrentTargetExist(collectionID interface{}) *MockTargetManager_IsCurrentTargetExist_Call { + return &MockTargetManager_IsCurrentTargetExist_Call{Call: _e.mock.On("IsCurrentTargetExist", collectionID)} +} + +func (_c *MockTargetManager_IsCurrentTargetExist_Call) Run(run func(collectionID int64)) *MockTargetManager_IsCurrentTargetExist_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64)) + }) + return _c +} + +func (_c *MockTargetManager_IsCurrentTargetExist_Call) Return(_a0 bool) *MockTargetManager_IsCurrentTargetExist_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockTargetManager_IsCurrentTargetExist_Call) RunAndReturn(run func(int64) bool) *MockTargetManager_IsCurrentTargetExist_Call { + _c.Call.Return(run) + return _c +} + +// IsNextTargetExist provides a mock function with given fields: collectionID +func (_m *MockTargetManager) IsNextTargetExist(collectionID int64) bool { + ret := _m.Called(collectionID) + + var r0 bool + if rf, ok := ret.Get(0).(func(int64) bool); ok { + r0 = rf(collectionID) + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// MockTargetManager_IsNextTargetExist_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsNextTargetExist' +type MockTargetManager_IsNextTargetExist_Call struct { + *mock.Call +} + +// IsNextTargetExist is a helper method to define mock.On call +// - collectionID int64 +func (_e *MockTargetManager_Expecter) IsNextTargetExist(collectionID interface{}) *MockTargetManager_IsNextTargetExist_Call { + return &MockTargetManager_IsNextTargetExist_Call{Call: _e.mock.On("IsNextTargetExist", collectionID)} +} + +func (_c *MockTargetManager_IsNextTargetExist_Call) Run(run func(collectionID int64)) *MockTargetManager_IsNextTargetExist_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64)) + }) + return _c +} + +func (_c *MockTargetManager_IsNextTargetExist_Call) Return(_a0 bool) *MockTargetManager_IsNextTargetExist_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockTargetManager_IsNextTargetExist_Call) RunAndReturn(run func(int64) bool) *MockTargetManager_IsNextTargetExist_Call { + _c.Call.Return(run) + return _c +} + +// PullNextTargetV1 provides a mock function with given fields: broker, collectionID, chosenPartitionIDs +func (_m *MockTargetManager) PullNextTargetV1(broker Broker, collectionID int64, chosenPartitionIDs ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error) { + _va := make([]interface{}, len(chosenPartitionIDs)) + for _i := range chosenPartitionIDs { + _va[_i] = chosenPartitionIDs[_i] + } + var _ca []interface{} + _ca = append(_ca, broker, collectionID) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 map[int64]*datapb.SegmentInfo + var r1 map[string]*DmChannel + var r2 error + if rf, ok := ret.Get(0).(func(Broker, int64, ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error)); ok { + return rf(broker, collectionID, chosenPartitionIDs...) + } + if rf, ok := ret.Get(0).(func(Broker, int64, ...int64) map[int64]*datapb.SegmentInfo); ok { + r0 = rf(broker, collectionID, chosenPartitionIDs...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[int64]*datapb.SegmentInfo) + } + } + + if rf, ok := ret.Get(1).(func(Broker, int64, ...int64) map[string]*DmChannel); ok { + r1 = rf(broker, collectionID, chosenPartitionIDs...) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(map[string]*DmChannel) + } + } + + if rf, ok := ret.Get(2).(func(Broker, int64, ...int64) error); ok { + r2 = rf(broker, collectionID, chosenPartitionIDs...) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// MockTargetManager_PullNextTargetV1_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'PullNextTargetV1' +type MockTargetManager_PullNextTargetV1_Call struct { + *mock.Call +} + +// PullNextTargetV1 is a helper method to define mock.On call +// - broker Broker +// - collectionID int64 +// - chosenPartitionIDs ...int64 +func (_e *MockTargetManager_Expecter) PullNextTargetV1(broker interface{}, collectionID interface{}, chosenPartitionIDs ...interface{}) *MockTargetManager_PullNextTargetV1_Call { + return &MockTargetManager_PullNextTargetV1_Call{Call: _e.mock.On("PullNextTargetV1", + append([]interface{}{broker, collectionID}, chosenPartitionIDs...)...)} +} + +func (_c *MockTargetManager_PullNextTargetV1_Call) Run(run func(broker Broker, collectionID int64, chosenPartitionIDs ...int64)) *MockTargetManager_PullNextTargetV1_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]int64, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(int64) + } + } + run(args[0].(Broker), args[1].(int64), variadicArgs...) + }) + return _c +} + +func (_c *MockTargetManager_PullNextTargetV1_Call) Return(_a0 map[int64]*datapb.SegmentInfo, _a1 map[string]*DmChannel, _a2 error) *MockTargetManager_PullNextTargetV1_Call { + _c.Call.Return(_a0, _a1, _a2) + return _c +} + +func (_c *MockTargetManager_PullNextTargetV1_Call) RunAndReturn(run func(Broker, int64, ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error)) *MockTargetManager_PullNextTargetV1_Call { + _c.Call.Return(run) + return _c +} + +// PullNextTargetV2 provides a mock function with given fields: broker, collectionID, chosenPartitionIDs +func (_m *MockTargetManager) PullNextTargetV2(broker Broker, collectionID int64, chosenPartitionIDs ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error) { + _va := make([]interface{}, len(chosenPartitionIDs)) + for _i := range chosenPartitionIDs { + _va[_i] = chosenPartitionIDs[_i] + } + var _ca []interface{} + _ca = append(_ca, broker, collectionID) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 map[int64]*datapb.SegmentInfo + var r1 map[string]*DmChannel + var r2 error + if rf, ok := ret.Get(0).(func(Broker, int64, ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error)); ok { + return rf(broker, collectionID, chosenPartitionIDs...) + } + if rf, ok := ret.Get(0).(func(Broker, int64, ...int64) map[int64]*datapb.SegmentInfo); ok { + r0 = rf(broker, collectionID, chosenPartitionIDs...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[int64]*datapb.SegmentInfo) + } + } + + if rf, ok := ret.Get(1).(func(Broker, int64, ...int64) map[string]*DmChannel); ok { + r1 = rf(broker, collectionID, chosenPartitionIDs...) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(map[string]*DmChannel) + } + } + + if rf, ok := ret.Get(2).(func(Broker, int64, ...int64) error); ok { + r2 = rf(broker, collectionID, chosenPartitionIDs...) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// MockTargetManager_PullNextTargetV2_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'PullNextTargetV2' +type MockTargetManager_PullNextTargetV2_Call struct { + *mock.Call +} + +// PullNextTargetV2 is a helper method to define mock.On call +// - broker Broker +// - collectionID int64 +// - chosenPartitionIDs ...int64 +func (_e *MockTargetManager_Expecter) PullNextTargetV2(broker interface{}, collectionID interface{}, chosenPartitionIDs ...interface{}) *MockTargetManager_PullNextTargetV2_Call { + return &MockTargetManager_PullNextTargetV2_Call{Call: _e.mock.On("PullNextTargetV2", + append([]interface{}{broker, collectionID}, chosenPartitionIDs...)...)} +} + +func (_c *MockTargetManager_PullNextTargetV2_Call) Run(run func(broker Broker, collectionID int64, chosenPartitionIDs ...int64)) *MockTargetManager_PullNextTargetV2_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]int64, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(int64) + } + } + run(args[0].(Broker), args[1].(int64), variadicArgs...) + }) + return _c +} + +func (_c *MockTargetManager_PullNextTargetV2_Call) Return(_a0 map[int64]*datapb.SegmentInfo, _a1 map[string]*DmChannel, _a2 error) *MockTargetManager_PullNextTargetV2_Call { + _c.Call.Return(_a0, _a1, _a2) + return _c +} + +func (_c *MockTargetManager_PullNextTargetV2_Call) RunAndReturn(run func(Broker, int64, ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error)) *MockTargetManager_PullNextTargetV2_Call { + _c.Call.Return(run) + return _c +} + +// Recover provides a mock function with given fields: catalog +func (_m *MockTargetManager) Recover(catalog metastore.QueryCoordCatalog) error { + ret := _m.Called(catalog) + + var r0 error + if rf, ok := ret.Get(0).(func(metastore.QueryCoordCatalog) error); ok { + r0 = rf(catalog) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockTargetManager_Recover_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Recover' +type MockTargetManager_Recover_Call struct { + *mock.Call +} + +// Recover is a helper method to define mock.On call +// - catalog metastore.QueryCoordCatalog +func (_e *MockTargetManager_Expecter) Recover(catalog interface{}) *MockTargetManager_Recover_Call { + return &MockTargetManager_Recover_Call{Call: _e.mock.On("Recover", catalog)} +} + +func (_c *MockTargetManager_Recover_Call) Run(run func(catalog metastore.QueryCoordCatalog)) *MockTargetManager_Recover_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(metastore.QueryCoordCatalog)) + }) + return _c +} + +func (_c *MockTargetManager_Recover_Call) Return(_a0 error) *MockTargetManager_Recover_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockTargetManager_Recover_Call) RunAndReturn(run func(metastore.QueryCoordCatalog) error) *MockTargetManager_Recover_Call { + _c.Call.Return(run) + return _c +} + +// RemoveCollection provides a mock function with given fields: collectionID +func (_m *MockTargetManager) RemoveCollection(collectionID int64) { + _m.Called(collectionID) +} + +// MockTargetManager_RemoveCollection_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemoveCollection' +type MockTargetManager_RemoveCollection_Call struct { + *mock.Call +} + +// RemoveCollection is a helper method to define mock.On call +// - collectionID int64 +func (_e *MockTargetManager_Expecter) RemoveCollection(collectionID interface{}) *MockTargetManager_RemoveCollection_Call { + return &MockTargetManager_RemoveCollection_Call{Call: _e.mock.On("RemoveCollection", collectionID)} +} + +func (_c *MockTargetManager_RemoveCollection_Call) Run(run func(collectionID int64)) *MockTargetManager_RemoveCollection_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64)) + }) + return _c +} + +func (_c *MockTargetManager_RemoveCollection_Call) Return() *MockTargetManager_RemoveCollection_Call { + _c.Call.Return() + return _c +} + +func (_c *MockTargetManager_RemoveCollection_Call) RunAndReturn(run func(int64)) *MockTargetManager_RemoveCollection_Call { + _c.Call.Return(run) + return _c +} + +// RemovePartition provides a mock function with given fields: collectionID, partitionIDs +func (_m *MockTargetManager) RemovePartition(collectionID int64, partitionIDs ...int64) { + _va := make([]interface{}, len(partitionIDs)) + for _i := range partitionIDs { + _va[_i] = partitionIDs[_i] + } + var _ca []interface{} + _ca = append(_ca, collectionID) + _ca = append(_ca, _va...) + _m.Called(_ca...) +} + +// MockTargetManager_RemovePartition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemovePartition' +type MockTargetManager_RemovePartition_Call struct { + *mock.Call +} + +// RemovePartition is a helper method to define mock.On call +// - collectionID int64 +// - partitionIDs ...int64 +func (_e *MockTargetManager_Expecter) RemovePartition(collectionID interface{}, partitionIDs ...interface{}) *MockTargetManager_RemovePartition_Call { + return &MockTargetManager_RemovePartition_Call{Call: _e.mock.On("RemovePartition", + append([]interface{}{collectionID}, partitionIDs...)...)} +} + +func (_c *MockTargetManager_RemovePartition_Call) Run(run func(collectionID int64, partitionIDs ...int64)) *MockTargetManager_RemovePartition_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]int64, len(args)-1) + for i, a := range args[1:] { + if a != nil { + variadicArgs[i] = a.(int64) + } + } + run(args[0].(int64), variadicArgs...) + }) + return _c +} + +func (_c *MockTargetManager_RemovePartition_Call) Return() *MockTargetManager_RemovePartition_Call { + _c.Call.Return() + return _c +} + +func (_c *MockTargetManager_RemovePartition_Call) RunAndReturn(run func(int64, ...int64)) *MockTargetManager_RemovePartition_Call { + _c.Call.Return(run) + return _c +} + +// SaveCurrentTarget provides a mock function with given fields: catalog +func (_m *MockTargetManager) SaveCurrentTarget(catalog metastore.QueryCoordCatalog) { + _m.Called(catalog) +} + +// MockTargetManager_SaveCurrentTarget_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SaveCurrentTarget' +type MockTargetManager_SaveCurrentTarget_Call struct { + *mock.Call +} + +// SaveCurrentTarget is a helper method to define mock.On call +// - catalog metastore.QueryCoordCatalog +func (_e *MockTargetManager_Expecter) SaveCurrentTarget(catalog interface{}) *MockTargetManager_SaveCurrentTarget_Call { + return &MockTargetManager_SaveCurrentTarget_Call{Call: _e.mock.On("SaveCurrentTarget", catalog)} +} + +func (_c *MockTargetManager_SaveCurrentTarget_Call) Run(run func(catalog metastore.QueryCoordCatalog)) *MockTargetManager_SaveCurrentTarget_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(metastore.QueryCoordCatalog)) + }) + return _c +} + +func (_c *MockTargetManager_SaveCurrentTarget_Call) Return() *MockTargetManager_SaveCurrentTarget_Call { + _c.Call.Return() + return _c +} + +func (_c *MockTargetManager_SaveCurrentTarget_Call) RunAndReturn(run func(metastore.QueryCoordCatalog)) *MockTargetManager_SaveCurrentTarget_Call { + _c.Call.Return(run) + return _c +} + +// UpdateCollectionCurrentTarget provides a mock function with given fields: collectionID +func (_m *MockTargetManager) UpdateCollectionCurrentTarget(collectionID int64) bool { + ret := _m.Called(collectionID) + + var r0 bool + if rf, ok := ret.Get(0).(func(int64) bool); ok { + r0 = rf(collectionID) + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// MockTargetManager_UpdateCollectionCurrentTarget_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateCollectionCurrentTarget' +type MockTargetManager_UpdateCollectionCurrentTarget_Call struct { + *mock.Call +} + +// UpdateCollectionCurrentTarget is a helper method to define mock.On call +// - collectionID int64 +func (_e *MockTargetManager_Expecter) UpdateCollectionCurrentTarget(collectionID interface{}) *MockTargetManager_UpdateCollectionCurrentTarget_Call { + return &MockTargetManager_UpdateCollectionCurrentTarget_Call{Call: _e.mock.On("UpdateCollectionCurrentTarget", collectionID)} +} + +func (_c *MockTargetManager_UpdateCollectionCurrentTarget_Call) Run(run func(collectionID int64)) *MockTargetManager_UpdateCollectionCurrentTarget_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64)) + }) + return _c +} + +func (_c *MockTargetManager_UpdateCollectionCurrentTarget_Call) Return(_a0 bool) *MockTargetManager_UpdateCollectionCurrentTarget_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockTargetManager_UpdateCollectionCurrentTarget_Call) RunAndReturn(run func(int64) bool) *MockTargetManager_UpdateCollectionCurrentTarget_Call { + _c.Call.Return(run) + return _c +} + +// UpdateCollectionNextTarget provides a mock function with given fields: collectionID +func (_m *MockTargetManager) UpdateCollectionNextTarget(collectionID int64) error { + ret := _m.Called(collectionID) + + var r0 error + if rf, ok := ret.Get(0).(func(int64) error); ok { + r0 = rf(collectionID) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockTargetManager_UpdateCollectionNextTarget_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateCollectionNextTarget' +type MockTargetManager_UpdateCollectionNextTarget_Call struct { + *mock.Call +} + +// UpdateCollectionNextTarget is a helper method to define mock.On call +// - collectionID int64 +func (_e *MockTargetManager_Expecter) UpdateCollectionNextTarget(collectionID interface{}) *MockTargetManager_UpdateCollectionNextTarget_Call { + return &MockTargetManager_UpdateCollectionNextTarget_Call{Call: _e.mock.On("UpdateCollectionNextTarget", collectionID)} +} + +func (_c *MockTargetManager_UpdateCollectionNextTarget_Call) Run(run func(collectionID int64)) *MockTargetManager_UpdateCollectionNextTarget_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64)) + }) + return _c +} + +func (_c *MockTargetManager_UpdateCollectionNextTarget_Call) Return(_a0 error) *MockTargetManager_UpdateCollectionNextTarget_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockTargetManager_UpdateCollectionNextTarget_Call) RunAndReturn(run func(int64) error) *MockTargetManager_UpdateCollectionNextTarget_Call { + _c.Call.Return(run) + return _c +} + +// NewMockTargetManager creates a new instance of MockTargetManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockTargetManager(t interface { + mock.TestingT + Cleanup(func()) +}) *MockTargetManager { + mock := &MockTargetManager{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/querycoordv2/meta/target_manager.go b/internal/querycoordv2/meta/target_manager.go index 6808e6ca9a..376fd37569 100644 --- a/internal/querycoordv2/meta/target_manager.go +++ b/internal/querycoordv2/meta/target_manager.go @@ -50,6 +50,29 @@ const ( NextTargetFirst ) +type TargetManagerInterface interface { + UpdateCollectionCurrentTarget(collectionID int64) bool + UpdateCollectionNextTarget(collectionID int64) error + PullNextTargetV1(broker Broker, collectionID int64, chosenPartitionIDs ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error) + PullNextTargetV2(broker Broker, collectionID int64, chosenPartitionIDs ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error) + RemoveCollection(collectionID int64) + RemovePartition(collectionID int64, partitionIDs ...int64) + GetGrowingSegmentsByCollection(collectionID int64, scope TargetScope) typeutil.UniqueSet + GetGrowingSegmentsByChannel(collectionID int64, channelName string, scope TargetScope) typeutil.UniqueSet + GetSealedSegmentsByCollection(collectionID int64, scope TargetScope) map[int64]*datapb.SegmentInfo + GetSealedSegmentsByChannel(collectionID int64, channelName string, scope TargetScope) map[int64]*datapb.SegmentInfo + GetDroppedSegmentsByChannel(collectionID int64, channelName string, scope TargetScope) []int64 + GetSealedSegmentsByPartition(collectionID int64, partitionID int64, scope TargetScope) map[int64]*datapb.SegmentInfo + GetDmChannelsByCollection(collectionID int64, scope TargetScope) map[string]*DmChannel + GetDmChannel(collectionID int64, channel string, scope TargetScope) *DmChannel + GetSealedSegment(collectionID int64, id int64, scope TargetScope) *datapb.SegmentInfo + GetCollectionTargetVersion(collectionID int64, scope TargetScope) int64 + IsCurrentTargetExist(collectionID int64) bool + IsNextTargetExist(collectionID int64) bool + SaveCurrentTarget(catalog metastore.QueryCoordCatalog) + Recover(catalog metastore.QueryCoordCatalog) error +} + type TargetManager struct { rwMutex sync.RWMutex broker Broker diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index 57f059f533..a74c2c1462 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -130,6 +130,10 @@ type QueryNode struct { // parameter turning hook queryHook optimizers.QueryHook + + // record the last modify ts of segment/channel distribution + lastModifyLock sync.RWMutex + lastModifyTs int64 } // NewQueryNode will return a QueryNode with abnormal state. diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index 54bf390ba8..f7807886ce 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -21,6 +21,7 @@ import ( "fmt" "strconv" "sync" + "time" "github.com/golang/protobuf/proto" "github.com/samber/lo" @@ -190,6 +191,8 @@ func (node *QueryNode) composeIndexMeta(indexInfos []*indexpb.IndexInfo, schema // WatchDmChannels create consumers on dmChannels to receive Incremental data,which is the important part of real-time query func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDmChannelsRequest) (status *commonpb.Status, e error) { + defer node.updateDistributionModifyTS() + channel := req.GetInfos()[0] log := log.Ctx(ctx).With( zap.Int64("collectionID", req.GetCollectionID()), @@ -339,6 +342,7 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm } func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmChannelRequest) (*commonpb.Status, error) { + defer node.updateDistributionModifyTS() log := log.Ctx(ctx).With( zap.Int64("collectionID", req.GetCollectionID()), zap.String("channel", req.GetChannelName()), @@ -396,6 +400,7 @@ func (node *QueryNode) LoadPartitions(ctx context.Context, req *querypb.LoadPart // LoadSegments load historical data into query node, historical data can be vector data or index func (node *QueryNode) LoadSegments(ctx context.Context, req *querypb.LoadSegmentsRequest) (*commonpb.Status, error) { + defer node.updateDistributionModifyTS() segment := req.GetInfos()[0] log := log.Ctx(ctx).With( @@ -528,6 +533,7 @@ func (node *QueryNode) ReleasePartitions(ctx context.Context, req *querypb.Relea // ReleaseSegments remove the specified segments from query node according segmentIDs, partitionIDs, and collectionID func (node *QueryNode) ReleaseSegments(ctx context.Context, req *querypb.ReleaseSegmentsRequest) (*commonpb.Status, error) { + defer node.updateDistributionModifyTS() log := log.Ctx(ctx).With( zap.Int64("collectionID", req.GetCollectionID()), zap.String("shard", req.GetShard()), @@ -1175,6 +1181,23 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get } defer node.lifetime.Done() + lastModifyTs := node.getDistributionModifyTS() + distributionChange := func() bool { + if req.GetLastUpdateTs() == 0 { + return true + } + + return req.GetLastUpdateTs() < lastModifyTs + } + + if !distributionChange() { + return &querypb.GetDataDistributionResponse{ + Status: merr.Success(), + NodeID: node.GetNodeID(), + LastModifyTs: lastModifyTs, + }, nil + } + sealedSegments := node.manager.Segment.GetBy(segments.WithType(commonpb.SegmentState_Sealed)) segmentVersionInfos := make([]*querypb.SegmentVersionInfo, 0, len(sealedSegments)) for _, s := range sealedSegments { @@ -1240,15 +1263,18 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get }) return &querypb.GetDataDistributionResponse{ - Status: merr.Success(), - NodeID: node.GetNodeID(), - Segments: segmentVersionInfos, - Channels: channelVersionInfos, - LeaderViews: leaderViews, + Status: merr.Success(), + NodeID: node.GetNodeID(), + Segments: segmentVersionInfos, + Channels: channelVersionInfos, + LeaderViews: leaderViews, + LastModifyTs: lastModifyTs, }, nil } func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDistributionRequest) (*commonpb.Status, error) { + defer node.updateDistributionModifyTS() + log := log.Ctx(ctx).With(zap.Int64("collectionID", req.GetCollectionID()), zap.String("channel", req.GetChannel()), zap.Int64("currentNodeID", node.GetNodeID())) // check node healthy @@ -1403,3 +1429,16 @@ func (req *deleteRequestStringer) String() string { tss := req.GetTimestamps() return fmt.Sprintf("%s, timestamp range: [%d-%d]", pkInfo, tss[0], tss[len(tss)-1]) } + +func (node *QueryNode) updateDistributionModifyTS() { + node.lastModifyLock.Lock() + defer node.lastModifyLock.Unlock() + + node.lastModifyTs = time.Now().UnixNano() +} + +func (node *QueryNode) getDistributionModifyTS() int64 { + node.lastModifyLock.RLock() + defer node.lastModifyLock.RUnlock() + return node.lastModifyTs +}