From f1c99869747a71474ef745bbb104999d0ece2379 Mon Sep 17 00:00:00 2001
From: wei liu <wei.liu@zilliz.com>
Date: Fri, 17 May 2024 10:11:37 +0800
Subject: [PATCH] enhance: Skip return data distribution if no change happen
 (#32814)

issue: #32813

---------

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
---
 Makefile                                      |   1 +
 internal/proto/query_coord.proto              |   2 +
 .../querycoordv2/dist/dist_controller_test.go |  25 +-
 internal/querycoordv2/dist/dist_handler.go    |  66 +-
 .../querycoordv2/dist/dist_handler_test.go    | 126 +++
 .../querycoordv2/meta/mock_target_manager.go  | 975 ++++++++++++++++++
 internal/querycoordv2/meta/target_manager.go  |  23 +
 internal/querynodev2/server.go                |   5 +
 internal/querynodev2/services.go              |  49 +-
 9 files changed, 1231 insertions(+), 41 deletions(-)
 create mode 100644 internal/querycoordv2/dist/dist_handler_test.go
 create mode 100644 internal/querycoordv2/meta/mock_target_manager.go

diff --git a/Makefile b/Makefile
index 268f7f994f..24cf720f10 100644
--- a/Makefile
+++ b/Makefile
@@ -428,6 +428,7 @@ generate-mockery-proxy: getdeps
 generate-mockery-querycoord: getdeps
 	$(INSTALL_PATH)/mockery --name=QueryNodeServer --dir=$(PWD)/internal/proto/querypb/ --output=$(PWD)/internal/querycoordv2/mocks --filename=mock_querynode.go --with-expecter --structname=MockQueryNodeServer
 	$(INSTALL_PATH)/mockery --name=Broker --dir=$(PWD)/internal/querycoordv2/meta --output=$(PWD)/internal/querycoordv2/meta --filename=mock_broker.go --with-expecter --structname=MockBroker --outpkg=meta
+	$(INSTALL_PATH)/mockery --name=TargetManagerInterface --dir=$(PWD)/internal/querycoordv2/meta --output=$(PWD)/internal/querycoordv2/meta --filename=mock_target_manager.go --with-expecter --structname=MockTargetManager --inpackage
 	$(INSTALL_PATH)/mockery --name=Scheduler --dir=$(PWD)/internal/querycoordv2/task --output=$(PWD)/internal/querycoordv2/task --filename=mock_scheduler.go --with-expecter --structname=MockScheduler --outpkg=task --inpackage
 	$(INSTALL_PATH)/mockery --name=Cluster --dir=$(PWD)/internal/querycoordv2/session --output=$(PWD)/internal/querycoordv2/session --filename=mock_cluster.go --with-expecter --structname=MockCluster --outpkg=session --inpackage
 	$(INSTALL_PATH)/mockery --name=Balance --dir=$(PWD)/internal/querycoordv2/balance --output=$(PWD)/internal/querycoordv2/balance --filename=mock_balancer.go --with-expecter --structname=MockBalancer --outpkg=balance --inpackage
diff --git a/internal/proto/query_coord.proto b/internal/proto/query_coord.proto
index 2f5facab9d..808c94d912 100644
--- a/internal/proto/query_coord.proto
+++ b/internal/proto/query_coord.proto
@@ -589,6 +589,7 @@ message SealedSegmentsChangeInfo {
 message GetDataDistributionRequest {
     common.MsgBase base = 1;
     map<string, msg.MsgPosition> checkpoints = 2;
+    int64 lastUpdateTs = 3;
 }
 
 message GetDataDistributionResponse {
@@ -597,6 +598,7 @@ message GetDataDistributionResponse {
     repeated SegmentVersionInfo segments = 3;
     repeated ChannelVersionInfo channels = 4;
     repeated LeaderView leader_views = 5;
+    int64 lastModifyTs = 6;
 }
 
 message LeaderView {
diff --git a/internal/querycoordv2/dist/dist_controller_test.go b/internal/querycoordv2/dist/dist_controller_test.go
index 21f21f0244..f69917b9b0 100644
--- a/internal/querycoordv2/dist/dist_controller_test.go
+++ b/internal/querycoordv2/dist/dist_controller_test.go
@@ -48,6 +48,8 @@ type DistControllerTestSuite struct {
 	kv     kv.MetaKv
 	meta   *meta.Meta
 	broker *meta.MockBroker
+
+	nodeMgr *session.NodeManager
 }
 
 func (suite *DistControllerTestSuite) SetupTest() {
@@ -69,15 +71,16 @@ func (suite *DistControllerTestSuite) SetupTest() {
 	// meta
 	store := querycoord.NewCatalog(suite.kv)
 	idAllocator := RandomIncrementIDAllocator()
-	suite.meta = meta.NewMeta(idAllocator, store, session.NewNodeManager())
+
+	suite.nodeMgr = session.NewNodeManager()
+	suite.meta = meta.NewMeta(idAllocator, store, suite.nodeMgr)
 
 	suite.mockCluster = session.NewMockCluster(suite.T())
-	nodeManager := session.NewNodeManager()
 	distManager := meta.NewDistributionManager()
 	suite.broker = meta.NewMockBroker(suite.T())
 	targetManager := meta.NewTargetManager(suite.broker, suite.meta)
 	suite.mockScheduler = task.NewMockScheduler(suite.T())
-	suite.controller = NewDistController(suite.mockCluster, nodeManager, distManager, targetManager, suite.mockScheduler)
+	suite.controller = NewDistController(suite.mockCluster, suite.nodeMgr, distManager, targetManager, suite.mockScheduler)
 }
 
 func (suite *DistControllerTestSuite) TearDownSuite() {
@@ -85,6 +88,11 @@ func (suite *DistControllerTestSuite) TearDownSuite() {
 }
 
 func (suite *DistControllerTestSuite) TestStart() {
+	suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
+		NodeID:   1,
+		Address:  "localhost",
+		Hostname: "localhost",
+	}))
 	dispatchCalled := atomic.NewBool(false)
 	suite.mockCluster.EXPECT().GetDataDistribution(mock.Anything, mock.Anything, mock.Anything).Return(
 		&querypb.GetDataDistributionResponse{Status: merr.Success(), NodeID: 1},
@@ -133,6 +141,17 @@ func (suite *DistControllerTestSuite) TestStop() {
 }
 
 func (suite *DistControllerTestSuite) TestSyncAll() {
+	suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
+		NodeID:   1,
+		Address:  "localhost",
+		Hostname: "localhost",
+	}))
+
+	suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
+		NodeID:   2,
+		Address:  "localhost",
+		Hostname: "localhost",
+	}))
 	suite.controller.StartDistInstance(context.TODO(), 1)
 	suite.controller.StartDistInstance(context.TODO(), 2)
 
diff --git a/internal/querycoordv2/dist/dist_handler.go b/internal/querycoordv2/dist/dist_handler.go
index 2c08e6f1f8..1d396a4150 100644
--- a/internal/querycoordv2/dist/dist_handler.go
+++ b/internal/querycoordv2/dist/dist_handler.go
@@ -26,7 +26,6 @@ import (
 	"go.uber.org/zap"
 
 	"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
-	"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
 	"github.com/milvus-io/milvus/internal/proto/datapb"
 	"github.com/milvus-io/milvus/internal/proto/querypb"
 	"github.com/milvus-io/milvus/internal/querycoordv2/meta"
@@ -40,16 +39,17 @@ import (
 )
 
 type distHandler struct {
-	nodeID      int64
-	c           chan struct{}
-	wg          sync.WaitGroup
-	client      session.Cluster
-	nodeManager *session.NodeManager
-	scheduler   task.Scheduler
-	dist        *meta.DistributionManager
-	target      *meta.TargetManager
-	mu          sync.Mutex
-	stopOnce    sync.Once
+	nodeID       int64
+	c            chan struct{}
+	wg           sync.WaitGroup
+	client       session.Cluster
+	nodeManager  *session.NodeManager
+	scheduler    task.Scheduler
+	dist         *meta.DistributionManager
+	target       meta.TargetManagerInterface
+	mu           sync.Mutex
+	stopOnce     sync.Once
+	lastUpdateTs int64
 }
 
 func (dh *distHandler) start(ctx context.Context) {
@@ -87,21 +87,31 @@ func (dh *distHandler) start(ctx context.Context) {
 
 func (dh *distHandler) handleDistResp(resp *querypb.GetDataDistributionResponse) {
 	node := dh.nodeManager.Get(resp.GetNodeID())
-	if node != nil {
+	if node == nil {
+		return
+	}
+
+	if time.Since(node.LastHeartbeat()) > paramtable.Get().QueryCoordCfg.HeartBeatWarningLag.GetAsDuration(time.Millisecond) {
+		log.Warn("node last heart beat time lag too behind", zap.Time("now", time.Now()),
+			zap.Time("lastHeartBeatTime", node.LastHeartbeat()), zap.Int64("nodeID", node.ID()))
+	}
+	node.SetLastHeartbeat(time.Now())
+
+	// skip  update dist if no distribution change happens in query node
+	if resp.GetLastModifyTs() <= dh.lastUpdateTs {
+		log.RatedInfo(30, "skip update dist due to no distribution change", zap.Int64("lastModifyTs", resp.GetLastModifyTs()), zap.Int64("lastUpdateTs", dh.lastUpdateTs))
+	} else {
+		dh.lastUpdateTs = resp.GetLastModifyTs()
+
 		node.UpdateStats(
 			session.WithSegmentCnt(len(resp.GetSegments())),
 			session.WithChannelCnt(len(resp.GetChannels())),
 		)
-		if time.Since(node.LastHeartbeat()) > paramtable.Get().QueryCoordCfg.HeartBeatWarningLag.GetAsDuration(time.Millisecond) {
-			log.Warn("node last heart beat time lag too behind", zap.Time("now", time.Now()),
-				zap.Time("lastHeartBeatTime", node.LastHeartbeat()), zap.Int64("nodeID", node.ID()))
-		}
-		node.SetLastHeartbeat(time.Now())
-	}
 
-	dh.updateSegmentsDistribution(resp)
-	dh.updateChannelsDistribution(resp)
-	dh.updateLeaderView(resp)
+		dh.updateSegmentsDistribution(resp)
+		dh.updateChannelsDistribution(resp)
+		dh.updateLeaderView(resp)
+	}
 
 	dh.scheduler.Dispatch(dh.nodeID)
 }
@@ -214,23 +224,13 @@ func (dh *distHandler) getDistribution(ctx context.Context) (*querypb.GetDataDis
 	dh.mu.Lock()
 	defer dh.mu.Unlock()
 
-	channels := make(map[string]*msgpb.MsgPosition)
-	for _, channel := range dh.dist.ChannelDistManager.GetByFilter(meta.WithNodeID2Channel(dh.nodeID)) {
-		targetChannel := dh.target.GetDmChannel(channel.GetCollectionID(), channel.GetChannelName(), meta.CurrentTarget)
-		if targetChannel == nil {
-			continue
-		}
-
-		channels[channel.GetChannelName()] = targetChannel.GetSeekPosition()
-	}
-
 	ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.DistributionRequestTimeout.GetAsDuration(time.Millisecond))
 	defer cancel()
 	resp, err := dh.client.GetDataDistribution(ctx, dh.nodeID, &querypb.GetDataDistributionRequest{
 		Base: commonpbutil.NewMsgBase(
 			commonpbutil.WithMsgType(commonpb.MsgType_GetDistribution),
 		),
-		Checkpoints: channels,
+		LastUpdateTs: dh.lastUpdateTs,
 	})
 	if err != nil {
 		return nil, err
@@ -259,7 +259,7 @@ func newDistHandler(
 	nodeManager *session.NodeManager,
 	scheduler task.Scheduler,
 	dist *meta.DistributionManager,
-	targetMgr *meta.TargetManager,
+	targetMgr meta.TargetManagerInterface,
 ) *distHandler {
 	h := &distHandler{
 		nodeID:      nodeID,
diff --git a/internal/querycoordv2/dist/dist_handler_test.go b/internal/querycoordv2/dist/dist_handler_test.go
new file mode 100644
index 0000000000..7d20878da5
--- /dev/null
+++ b/internal/querycoordv2/dist/dist_handler_test.go
@@ -0,0 +1,126 @@
+// Licensed to the LF AI & Data foundation under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package dist
+
+import (
+	"context"
+	"testing"
+	"time"
+
+	"github.com/cockroachdb/errors"
+	"github.com/stretchr/testify/mock"
+	"github.com/stretchr/testify/suite"
+
+	"github.com/milvus-io/milvus/internal/proto/querypb"
+	"github.com/milvus-io/milvus/internal/querycoordv2/meta"
+	"github.com/milvus-io/milvus/internal/querycoordv2/session"
+	"github.com/milvus-io/milvus/internal/querycoordv2/task"
+	"github.com/milvus-io/milvus/pkg/util/merr"
+	"github.com/milvus-io/milvus/pkg/util/paramtable"
+)
+
+type DistHandlerSuite struct {
+	suite.Suite
+
+	ctx    context.Context
+	meta   *meta.Meta
+	broker *meta.MockBroker
+
+	nodeID      int64
+	client      *session.MockCluster
+	nodeManager *session.NodeManager
+	scheduler   *task.MockScheduler
+	dist        *meta.DistributionManager
+	target      *meta.MockTargetManager
+
+	handler *distHandler
+}
+
+func (suite *DistHandlerSuite) SetupSuite() {
+	paramtable.Init()
+	suite.nodeID = 1
+	suite.client = session.NewMockCluster(suite.T())
+	suite.nodeManager = session.NewNodeManager()
+	suite.scheduler = task.NewMockScheduler(suite.T())
+	suite.dist = meta.NewDistributionManager()
+
+	suite.target = meta.NewMockTargetManager(suite.T())
+	suite.ctx = context.Background()
+
+	suite.scheduler.EXPECT().Dispatch(mock.Anything).Maybe()
+	suite.target.EXPECT().GetSealedSegment(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
+	suite.target.EXPECT().GetDmChannel(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
+}
+
+func (suite *DistHandlerSuite) TestBasic() {
+	suite.nodeManager.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
+		NodeID:   1,
+		Address:  "localhost",
+		Hostname: "localhost",
+	}))
+	suite.client.EXPECT().GetDataDistribution(mock.Anything, mock.Anything, mock.Anything).Return(&querypb.GetDataDistributionResponse{
+		Status: merr.Success(),
+		NodeID: 1,
+		Channels: []*querypb.ChannelVersionInfo{
+			{
+				Channel:    "test-channel-1",
+				Collection: 1,
+				Version:    1,
+			},
+		},
+		Segments: []*querypb.SegmentVersionInfo{
+			{
+				ID:         1,
+				Collection: 1,
+				Partition:  1,
+				Channel:    "test-channel-1",
+				Version:    1,
+			},
+		},
+
+		LeaderViews: []*querypb.LeaderView{
+			{
+				Collection: 1,
+				Channel:    "test-channel-1",
+			},
+		},
+		LastModifyTs: 1,
+	}, nil)
+
+	suite.handler = newDistHandler(suite.ctx, suite.nodeID, suite.client, suite.nodeManager, suite.scheduler, suite.dist, suite.target)
+	defer suite.handler.stop()
+
+	time.Sleep(10 * time.Second)
+}
+
+func (suite *DistHandlerSuite) TestGetDistributionFailed() {
+	suite.nodeManager.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
+		NodeID:   1,
+		Address:  "localhost",
+		Hostname: "localhost",
+	}))
+	suite.client.EXPECT().GetDataDistribution(mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("fake error"))
+
+	suite.handler = newDistHandler(suite.ctx, suite.nodeID, suite.client, suite.nodeManager, suite.scheduler, suite.dist, suite.target)
+	defer suite.handler.stop()
+
+	time.Sleep(10 * time.Second)
+}
+
+func TestDistHandlerSuite(t *testing.T) {
+	suite.Run(t, new(DistHandlerSuite))
+}
diff --git a/internal/querycoordv2/meta/mock_target_manager.go b/internal/querycoordv2/meta/mock_target_manager.go
new file mode 100644
index 0000000000..5728fd2903
--- /dev/null
+++ b/internal/querycoordv2/meta/mock_target_manager.go
@@ -0,0 +1,975 @@
+// Code generated by mockery v2.32.4. DO NOT EDIT.
+
+package meta
+
+import (
+	metastore "github.com/milvus-io/milvus/internal/metastore"
+	datapb "github.com/milvus-io/milvus/internal/proto/datapb"
+
+	mock "github.com/stretchr/testify/mock"
+
+	typeutil "github.com/milvus-io/milvus/pkg/util/typeutil"
+)
+
+// MockTargetManager is an autogenerated mock type for the TargetManagerInterface type
+type MockTargetManager struct {
+	mock.Mock
+}
+
+type MockTargetManager_Expecter struct {
+	mock *mock.Mock
+}
+
+func (_m *MockTargetManager) EXPECT() *MockTargetManager_Expecter {
+	return &MockTargetManager_Expecter{mock: &_m.Mock}
+}
+
+// GetCollectionTargetVersion provides a mock function with given fields: collectionID, scope
+func (_m *MockTargetManager) GetCollectionTargetVersion(collectionID int64, scope int32) int64 {
+	ret := _m.Called(collectionID, scope)
+
+	var r0 int64
+	if rf, ok := ret.Get(0).(func(int64, int32) int64); ok {
+		r0 = rf(collectionID, scope)
+	} else {
+		r0 = ret.Get(0).(int64)
+	}
+
+	return r0
+}
+
+// MockTargetManager_GetCollectionTargetVersion_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetCollectionTargetVersion'
+type MockTargetManager_GetCollectionTargetVersion_Call struct {
+	*mock.Call
+}
+
+// GetCollectionTargetVersion is a helper method to define mock.On call
+//   - collectionID int64
+//   - scope int32
+func (_e *MockTargetManager_Expecter) GetCollectionTargetVersion(collectionID interface{}, scope interface{}) *MockTargetManager_GetCollectionTargetVersion_Call {
+	return &MockTargetManager_GetCollectionTargetVersion_Call{Call: _e.mock.On("GetCollectionTargetVersion", collectionID, scope)}
+}
+
+func (_c *MockTargetManager_GetCollectionTargetVersion_Call) Run(run func(collectionID int64, scope int32)) *MockTargetManager_GetCollectionTargetVersion_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run(args[0].(int64), args[1].(int32))
+	})
+	return _c
+}
+
+func (_c *MockTargetManager_GetCollectionTargetVersion_Call) Return(_a0 int64) *MockTargetManager_GetCollectionTargetVersion_Call {
+	_c.Call.Return(_a0)
+	return _c
+}
+
+func (_c *MockTargetManager_GetCollectionTargetVersion_Call) RunAndReturn(run func(int64, int32) int64) *MockTargetManager_GetCollectionTargetVersion_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+// GetDmChannel provides a mock function with given fields: collectionID, channel, scope
+func (_m *MockTargetManager) GetDmChannel(collectionID int64, channel string, scope int32) *DmChannel {
+	ret := _m.Called(collectionID, channel, scope)
+
+	var r0 *DmChannel
+	if rf, ok := ret.Get(0).(func(int64, string, int32) *DmChannel); ok {
+		r0 = rf(collectionID, channel, scope)
+	} else {
+		if ret.Get(0) != nil {
+			r0 = ret.Get(0).(*DmChannel)
+		}
+	}
+
+	return r0
+}
+
+// MockTargetManager_GetDmChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetDmChannel'
+type MockTargetManager_GetDmChannel_Call struct {
+	*mock.Call
+}
+
+// GetDmChannel is a helper method to define mock.On call
+//   - collectionID int64
+//   - channel string
+//   - scope int32
+func (_e *MockTargetManager_Expecter) GetDmChannel(collectionID interface{}, channel interface{}, scope interface{}) *MockTargetManager_GetDmChannel_Call {
+	return &MockTargetManager_GetDmChannel_Call{Call: _e.mock.On("GetDmChannel", collectionID, channel, scope)}
+}
+
+func (_c *MockTargetManager_GetDmChannel_Call) Run(run func(collectionID int64, channel string, scope int32)) *MockTargetManager_GetDmChannel_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run(args[0].(int64), args[1].(string), args[2].(int32))
+	})
+	return _c
+}
+
+func (_c *MockTargetManager_GetDmChannel_Call) Return(_a0 *DmChannel) *MockTargetManager_GetDmChannel_Call {
+	_c.Call.Return(_a0)
+	return _c
+}
+
+func (_c *MockTargetManager_GetDmChannel_Call) RunAndReturn(run func(int64, string, int32) *DmChannel) *MockTargetManager_GetDmChannel_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+// GetDmChannelsByCollection provides a mock function with given fields: collectionID, scope
+func (_m *MockTargetManager) GetDmChannelsByCollection(collectionID int64, scope int32) map[string]*DmChannel {
+	ret := _m.Called(collectionID, scope)
+
+	var r0 map[string]*DmChannel
+	if rf, ok := ret.Get(0).(func(int64, int32) map[string]*DmChannel); ok {
+		r0 = rf(collectionID, scope)
+	} else {
+		if ret.Get(0) != nil {
+			r0 = ret.Get(0).(map[string]*DmChannel)
+		}
+	}
+
+	return r0
+}
+
+// MockTargetManager_GetDmChannelsByCollection_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetDmChannelsByCollection'
+type MockTargetManager_GetDmChannelsByCollection_Call struct {
+	*mock.Call
+}
+
+// GetDmChannelsByCollection is a helper method to define mock.On call
+//   - collectionID int64
+//   - scope int32
+func (_e *MockTargetManager_Expecter) GetDmChannelsByCollection(collectionID interface{}, scope interface{}) *MockTargetManager_GetDmChannelsByCollection_Call {
+	return &MockTargetManager_GetDmChannelsByCollection_Call{Call: _e.mock.On("GetDmChannelsByCollection", collectionID, scope)}
+}
+
+func (_c *MockTargetManager_GetDmChannelsByCollection_Call) Run(run func(collectionID int64, scope int32)) *MockTargetManager_GetDmChannelsByCollection_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run(args[0].(int64), args[1].(int32))
+	})
+	return _c
+}
+
+func (_c *MockTargetManager_GetDmChannelsByCollection_Call) Return(_a0 map[string]*DmChannel) *MockTargetManager_GetDmChannelsByCollection_Call {
+	_c.Call.Return(_a0)
+	return _c
+}
+
+func (_c *MockTargetManager_GetDmChannelsByCollection_Call) RunAndReturn(run func(int64, int32) map[string]*DmChannel) *MockTargetManager_GetDmChannelsByCollection_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+// GetDroppedSegmentsByChannel provides a mock function with given fields: collectionID, channelName, scope
+func (_m *MockTargetManager) GetDroppedSegmentsByChannel(collectionID int64, channelName string, scope int32) []int64 {
+	ret := _m.Called(collectionID, channelName, scope)
+
+	var r0 []int64
+	if rf, ok := ret.Get(0).(func(int64, string, int32) []int64); ok {
+		r0 = rf(collectionID, channelName, scope)
+	} else {
+		if ret.Get(0) != nil {
+			r0 = ret.Get(0).([]int64)
+		}
+	}
+
+	return r0
+}
+
+// MockTargetManager_GetDroppedSegmentsByChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetDroppedSegmentsByChannel'
+type MockTargetManager_GetDroppedSegmentsByChannel_Call struct {
+	*mock.Call
+}
+
+// GetDroppedSegmentsByChannel is a helper method to define mock.On call
+//   - collectionID int64
+//   - channelName string
+//   - scope int32
+func (_e *MockTargetManager_Expecter) GetDroppedSegmentsByChannel(collectionID interface{}, channelName interface{}, scope interface{}) *MockTargetManager_GetDroppedSegmentsByChannel_Call {
+	return &MockTargetManager_GetDroppedSegmentsByChannel_Call{Call: _e.mock.On("GetDroppedSegmentsByChannel", collectionID, channelName, scope)}
+}
+
+func (_c *MockTargetManager_GetDroppedSegmentsByChannel_Call) Run(run func(collectionID int64, channelName string, scope int32)) *MockTargetManager_GetDroppedSegmentsByChannel_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run(args[0].(int64), args[1].(string), args[2].(int32))
+	})
+	return _c
+}
+
+func (_c *MockTargetManager_GetDroppedSegmentsByChannel_Call) Return(_a0 []int64) *MockTargetManager_GetDroppedSegmentsByChannel_Call {
+	_c.Call.Return(_a0)
+	return _c
+}
+
+func (_c *MockTargetManager_GetDroppedSegmentsByChannel_Call) RunAndReturn(run func(int64, string, int32) []int64) *MockTargetManager_GetDroppedSegmentsByChannel_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+// GetGrowingSegmentsByChannel provides a mock function with given fields: collectionID, channelName, scope
+func (_m *MockTargetManager) GetGrowingSegmentsByChannel(collectionID int64, channelName string, scope int32) typeutil.Set[int64] {
+	ret := _m.Called(collectionID, channelName, scope)
+
+	var r0 typeutil.Set[int64]
+	if rf, ok := ret.Get(0).(func(int64, string, int32) typeutil.Set[int64]); ok {
+		r0 = rf(collectionID, channelName, scope)
+	} else {
+		if ret.Get(0) != nil {
+			r0 = ret.Get(0).(typeutil.Set[int64])
+		}
+	}
+
+	return r0
+}
+
+// MockTargetManager_GetGrowingSegmentsByChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetGrowingSegmentsByChannel'
+type MockTargetManager_GetGrowingSegmentsByChannel_Call struct {
+	*mock.Call
+}
+
+// GetGrowingSegmentsByChannel is a helper method to define mock.On call
+//   - collectionID int64
+//   - channelName string
+//   - scope int32
+func (_e *MockTargetManager_Expecter) GetGrowingSegmentsByChannel(collectionID interface{}, channelName interface{}, scope interface{}) *MockTargetManager_GetGrowingSegmentsByChannel_Call {
+	return &MockTargetManager_GetGrowingSegmentsByChannel_Call{Call: _e.mock.On("GetGrowingSegmentsByChannel", collectionID, channelName, scope)}
+}
+
+func (_c *MockTargetManager_GetGrowingSegmentsByChannel_Call) Run(run func(collectionID int64, channelName string, scope int32)) *MockTargetManager_GetGrowingSegmentsByChannel_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run(args[0].(int64), args[1].(string), args[2].(int32))
+	})
+	return _c
+}
+
+func (_c *MockTargetManager_GetGrowingSegmentsByChannel_Call) Return(_a0 typeutil.Set[int64]) *MockTargetManager_GetGrowingSegmentsByChannel_Call {
+	_c.Call.Return(_a0)
+	return _c
+}
+
+func (_c *MockTargetManager_GetGrowingSegmentsByChannel_Call) RunAndReturn(run func(int64, string, int32) typeutil.Set[int64]) *MockTargetManager_GetGrowingSegmentsByChannel_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+// GetGrowingSegmentsByCollection provides a mock function with given fields: collectionID, scope
+func (_m *MockTargetManager) GetGrowingSegmentsByCollection(collectionID int64, scope int32) typeutil.Set[int64] {
+	ret := _m.Called(collectionID, scope)
+
+	var r0 typeutil.Set[int64]
+	if rf, ok := ret.Get(0).(func(int64, int32) typeutil.Set[int64]); ok {
+		r0 = rf(collectionID, scope)
+	} else {
+		if ret.Get(0) != nil {
+			r0 = ret.Get(0).(typeutil.Set[int64])
+		}
+	}
+
+	return r0
+}
+
+// MockTargetManager_GetGrowingSegmentsByCollection_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetGrowingSegmentsByCollection'
+type MockTargetManager_GetGrowingSegmentsByCollection_Call struct {
+	*mock.Call
+}
+
+// GetGrowingSegmentsByCollection is a helper method to define mock.On call
+//   - collectionID int64
+//   - scope int32
+func (_e *MockTargetManager_Expecter) GetGrowingSegmentsByCollection(collectionID interface{}, scope interface{}) *MockTargetManager_GetGrowingSegmentsByCollection_Call {
+	return &MockTargetManager_GetGrowingSegmentsByCollection_Call{Call: _e.mock.On("GetGrowingSegmentsByCollection", collectionID, scope)}
+}
+
+func (_c *MockTargetManager_GetGrowingSegmentsByCollection_Call) Run(run func(collectionID int64, scope int32)) *MockTargetManager_GetGrowingSegmentsByCollection_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run(args[0].(int64), args[1].(int32))
+	})
+	return _c
+}
+
+func (_c *MockTargetManager_GetGrowingSegmentsByCollection_Call) Return(_a0 typeutil.Set[int64]) *MockTargetManager_GetGrowingSegmentsByCollection_Call {
+	_c.Call.Return(_a0)
+	return _c
+}
+
+func (_c *MockTargetManager_GetGrowingSegmentsByCollection_Call) RunAndReturn(run func(int64, int32) typeutil.Set[int64]) *MockTargetManager_GetGrowingSegmentsByCollection_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+// GetSealedSegment provides a mock function with given fields: collectionID, id, scope
+func (_m *MockTargetManager) GetSealedSegment(collectionID int64, id int64, scope int32) *datapb.SegmentInfo {
+	ret := _m.Called(collectionID, id, scope)
+
+	var r0 *datapb.SegmentInfo
+	if rf, ok := ret.Get(0).(func(int64, int64, int32) *datapb.SegmentInfo); ok {
+		r0 = rf(collectionID, id, scope)
+	} else {
+		if ret.Get(0) != nil {
+			r0 = ret.Get(0).(*datapb.SegmentInfo)
+		}
+	}
+
+	return r0
+}
+
+// MockTargetManager_GetSealedSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSealedSegment'
+type MockTargetManager_GetSealedSegment_Call struct {
+	*mock.Call
+}
+
+// GetSealedSegment is a helper method to define mock.On call
+//   - collectionID int64
+//   - id int64
+//   - scope int32
+func (_e *MockTargetManager_Expecter) GetSealedSegment(collectionID interface{}, id interface{}, scope interface{}) *MockTargetManager_GetSealedSegment_Call {
+	return &MockTargetManager_GetSealedSegment_Call{Call: _e.mock.On("GetSealedSegment", collectionID, id, scope)}
+}
+
+func (_c *MockTargetManager_GetSealedSegment_Call) Run(run func(collectionID int64, id int64, scope int32)) *MockTargetManager_GetSealedSegment_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run(args[0].(int64), args[1].(int64), args[2].(int32))
+	})
+	return _c
+}
+
+func (_c *MockTargetManager_GetSealedSegment_Call) Return(_a0 *datapb.SegmentInfo) *MockTargetManager_GetSealedSegment_Call {
+	_c.Call.Return(_a0)
+	return _c
+}
+
+func (_c *MockTargetManager_GetSealedSegment_Call) RunAndReturn(run func(int64, int64, int32) *datapb.SegmentInfo) *MockTargetManager_GetSealedSegment_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+// GetSealedSegmentsByChannel provides a mock function with given fields: collectionID, channelName, scope
+func (_m *MockTargetManager) GetSealedSegmentsByChannel(collectionID int64, channelName string, scope int32) map[int64]*datapb.SegmentInfo {
+	ret := _m.Called(collectionID, channelName, scope)
+
+	var r0 map[int64]*datapb.SegmentInfo
+	if rf, ok := ret.Get(0).(func(int64, string, int32) map[int64]*datapb.SegmentInfo); ok {
+		r0 = rf(collectionID, channelName, scope)
+	} else {
+		if ret.Get(0) != nil {
+			r0 = ret.Get(0).(map[int64]*datapb.SegmentInfo)
+		}
+	}
+
+	return r0
+}
+
+// MockTargetManager_GetSealedSegmentsByChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSealedSegmentsByChannel'
+type MockTargetManager_GetSealedSegmentsByChannel_Call struct {
+	*mock.Call
+}
+
+// GetSealedSegmentsByChannel is a helper method to define mock.On call
+//   - collectionID int64
+//   - channelName string
+//   - scope int32
+func (_e *MockTargetManager_Expecter) GetSealedSegmentsByChannel(collectionID interface{}, channelName interface{}, scope interface{}) *MockTargetManager_GetSealedSegmentsByChannel_Call {
+	return &MockTargetManager_GetSealedSegmentsByChannel_Call{Call: _e.mock.On("GetSealedSegmentsByChannel", collectionID, channelName, scope)}
+}
+
+func (_c *MockTargetManager_GetSealedSegmentsByChannel_Call) Run(run func(collectionID int64, channelName string, scope int32)) *MockTargetManager_GetSealedSegmentsByChannel_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run(args[0].(int64), args[1].(string), args[2].(int32))
+	})
+	return _c
+}
+
+func (_c *MockTargetManager_GetSealedSegmentsByChannel_Call) Return(_a0 map[int64]*datapb.SegmentInfo) *MockTargetManager_GetSealedSegmentsByChannel_Call {
+	_c.Call.Return(_a0)
+	return _c
+}
+
+func (_c *MockTargetManager_GetSealedSegmentsByChannel_Call) RunAndReturn(run func(int64, string, int32) map[int64]*datapb.SegmentInfo) *MockTargetManager_GetSealedSegmentsByChannel_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+// GetSealedSegmentsByCollection provides a mock function with given fields: collectionID, scope
+func (_m *MockTargetManager) GetSealedSegmentsByCollection(collectionID int64, scope int32) map[int64]*datapb.SegmentInfo {
+	ret := _m.Called(collectionID, scope)
+
+	var r0 map[int64]*datapb.SegmentInfo
+	if rf, ok := ret.Get(0).(func(int64, int32) map[int64]*datapb.SegmentInfo); ok {
+		r0 = rf(collectionID, scope)
+	} else {
+		if ret.Get(0) != nil {
+			r0 = ret.Get(0).(map[int64]*datapb.SegmentInfo)
+		}
+	}
+
+	return r0
+}
+
+// MockTargetManager_GetSealedSegmentsByCollection_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSealedSegmentsByCollection'
+type MockTargetManager_GetSealedSegmentsByCollection_Call struct {
+	*mock.Call
+}
+
+// GetSealedSegmentsByCollection is a helper method to define mock.On call
+//   - collectionID int64
+//   - scope int32
+func (_e *MockTargetManager_Expecter) GetSealedSegmentsByCollection(collectionID interface{}, scope interface{}) *MockTargetManager_GetSealedSegmentsByCollection_Call {
+	return &MockTargetManager_GetSealedSegmentsByCollection_Call{Call: _e.mock.On("GetSealedSegmentsByCollection", collectionID, scope)}
+}
+
+func (_c *MockTargetManager_GetSealedSegmentsByCollection_Call) Run(run func(collectionID int64, scope int32)) *MockTargetManager_GetSealedSegmentsByCollection_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run(args[0].(int64), args[1].(int32))
+	})
+	return _c
+}
+
+func (_c *MockTargetManager_GetSealedSegmentsByCollection_Call) Return(_a0 map[int64]*datapb.SegmentInfo) *MockTargetManager_GetSealedSegmentsByCollection_Call {
+	_c.Call.Return(_a0)
+	return _c
+}
+
+func (_c *MockTargetManager_GetSealedSegmentsByCollection_Call) RunAndReturn(run func(int64, int32) map[int64]*datapb.SegmentInfo) *MockTargetManager_GetSealedSegmentsByCollection_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+// GetSealedSegmentsByPartition provides a mock function with given fields: collectionID, partitionID, scope
+func (_m *MockTargetManager) GetSealedSegmentsByPartition(collectionID int64, partitionID int64, scope int32) map[int64]*datapb.SegmentInfo {
+	ret := _m.Called(collectionID, partitionID, scope)
+
+	var r0 map[int64]*datapb.SegmentInfo
+	if rf, ok := ret.Get(0).(func(int64, int64, int32) map[int64]*datapb.SegmentInfo); ok {
+		r0 = rf(collectionID, partitionID, scope)
+	} else {
+		if ret.Get(0) != nil {
+			r0 = ret.Get(0).(map[int64]*datapb.SegmentInfo)
+		}
+	}
+
+	return r0
+}
+
+// MockTargetManager_GetSealedSegmentsByPartition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSealedSegmentsByPartition'
+type MockTargetManager_GetSealedSegmentsByPartition_Call struct {
+	*mock.Call
+}
+
+// GetSealedSegmentsByPartition is a helper method to define mock.On call
+//   - collectionID int64
+//   - partitionID int64
+//   - scope int32
+func (_e *MockTargetManager_Expecter) GetSealedSegmentsByPartition(collectionID interface{}, partitionID interface{}, scope interface{}) *MockTargetManager_GetSealedSegmentsByPartition_Call {
+	return &MockTargetManager_GetSealedSegmentsByPartition_Call{Call: _e.mock.On("GetSealedSegmentsByPartition", collectionID, partitionID, scope)}
+}
+
+func (_c *MockTargetManager_GetSealedSegmentsByPartition_Call) Run(run func(collectionID int64, partitionID int64, scope int32)) *MockTargetManager_GetSealedSegmentsByPartition_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run(args[0].(int64), args[1].(int64), args[2].(int32))
+	})
+	return _c
+}
+
+func (_c *MockTargetManager_GetSealedSegmentsByPartition_Call) Return(_a0 map[int64]*datapb.SegmentInfo) *MockTargetManager_GetSealedSegmentsByPartition_Call {
+	_c.Call.Return(_a0)
+	return _c
+}
+
+func (_c *MockTargetManager_GetSealedSegmentsByPartition_Call) RunAndReturn(run func(int64, int64, int32) map[int64]*datapb.SegmentInfo) *MockTargetManager_GetSealedSegmentsByPartition_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+// IsCurrentTargetExist provides a mock function with given fields: collectionID
+func (_m *MockTargetManager) IsCurrentTargetExist(collectionID int64) bool {
+	ret := _m.Called(collectionID)
+
+	var r0 bool
+	if rf, ok := ret.Get(0).(func(int64) bool); ok {
+		r0 = rf(collectionID)
+	} else {
+		r0 = ret.Get(0).(bool)
+	}
+
+	return r0
+}
+
+// MockTargetManager_IsCurrentTargetExist_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsCurrentTargetExist'
+type MockTargetManager_IsCurrentTargetExist_Call struct {
+	*mock.Call
+}
+
+// IsCurrentTargetExist is a helper method to define mock.On call
+//   - collectionID int64
+func (_e *MockTargetManager_Expecter) IsCurrentTargetExist(collectionID interface{}) *MockTargetManager_IsCurrentTargetExist_Call {
+	return &MockTargetManager_IsCurrentTargetExist_Call{Call: _e.mock.On("IsCurrentTargetExist", collectionID)}
+}
+
+func (_c *MockTargetManager_IsCurrentTargetExist_Call) Run(run func(collectionID int64)) *MockTargetManager_IsCurrentTargetExist_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run(args[0].(int64))
+	})
+	return _c
+}
+
+func (_c *MockTargetManager_IsCurrentTargetExist_Call) Return(_a0 bool) *MockTargetManager_IsCurrentTargetExist_Call {
+	_c.Call.Return(_a0)
+	return _c
+}
+
+func (_c *MockTargetManager_IsCurrentTargetExist_Call) RunAndReturn(run func(int64) bool) *MockTargetManager_IsCurrentTargetExist_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+// IsNextTargetExist provides a mock function with given fields: collectionID
+func (_m *MockTargetManager) IsNextTargetExist(collectionID int64) bool {
+	ret := _m.Called(collectionID)
+
+	var r0 bool
+	if rf, ok := ret.Get(0).(func(int64) bool); ok {
+		r0 = rf(collectionID)
+	} else {
+		r0 = ret.Get(0).(bool)
+	}
+
+	return r0
+}
+
+// MockTargetManager_IsNextTargetExist_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsNextTargetExist'
+type MockTargetManager_IsNextTargetExist_Call struct {
+	*mock.Call
+}
+
+// IsNextTargetExist is a helper method to define mock.On call
+//   - collectionID int64
+func (_e *MockTargetManager_Expecter) IsNextTargetExist(collectionID interface{}) *MockTargetManager_IsNextTargetExist_Call {
+	return &MockTargetManager_IsNextTargetExist_Call{Call: _e.mock.On("IsNextTargetExist", collectionID)}
+}
+
+func (_c *MockTargetManager_IsNextTargetExist_Call) Run(run func(collectionID int64)) *MockTargetManager_IsNextTargetExist_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run(args[0].(int64))
+	})
+	return _c
+}
+
+func (_c *MockTargetManager_IsNextTargetExist_Call) Return(_a0 bool) *MockTargetManager_IsNextTargetExist_Call {
+	_c.Call.Return(_a0)
+	return _c
+}
+
+func (_c *MockTargetManager_IsNextTargetExist_Call) RunAndReturn(run func(int64) bool) *MockTargetManager_IsNextTargetExist_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+// PullNextTargetV1 provides a mock function with given fields: broker, collectionID, chosenPartitionIDs
+func (_m *MockTargetManager) PullNextTargetV1(broker Broker, collectionID int64, chosenPartitionIDs ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error) {
+	_va := make([]interface{}, len(chosenPartitionIDs))
+	for _i := range chosenPartitionIDs {
+		_va[_i] = chosenPartitionIDs[_i]
+	}
+	var _ca []interface{}
+	_ca = append(_ca, broker, collectionID)
+	_ca = append(_ca, _va...)
+	ret := _m.Called(_ca...)
+
+	var r0 map[int64]*datapb.SegmentInfo
+	var r1 map[string]*DmChannel
+	var r2 error
+	if rf, ok := ret.Get(0).(func(Broker, int64, ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error)); ok {
+		return rf(broker, collectionID, chosenPartitionIDs...)
+	}
+	if rf, ok := ret.Get(0).(func(Broker, int64, ...int64) map[int64]*datapb.SegmentInfo); ok {
+		r0 = rf(broker, collectionID, chosenPartitionIDs...)
+	} else {
+		if ret.Get(0) != nil {
+			r0 = ret.Get(0).(map[int64]*datapb.SegmentInfo)
+		}
+	}
+
+	if rf, ok := ret.Get(1).(func(Broker, int64, ...int64) map[string]*DmChannel); ok {
+		r1 = rf(broker, collectionID, chosenPartitionIDs...)
+	} else {
+		if ret.Get(1) != nil {
+			r1 = ret.Get(1).(map[string]*DmChannel)
+		}
+	}
+
+	if rf, ok := ret.Get(2).(func(Broker, int64, ...int64) error); ok {
+		r2 = rf(broker, collectionID, chosenPartitionIDs...)
+	} else {
+		r2 = ret.Error(2)
+	}
+
+	return r0, r1, r2
+}
+
+// MockTargetManager_PullNextTargetV1_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'PullNextTargetV1'
+type MockTargetManager_PullNextTargetV1_Call struct {
+	*mock.Call
+}
+
+// PullNextTargetV1 is a helper method to define mock.On call
+//   - broker Broker
+//   - collectionID int64
+//   - chosenPartitionIDs ...int64
+func (_e *MockTargetManager_Expecter) PullNextTargetV1(broker interface{}, collectionID interface{}, chosenPartitionIDs ...interface{}) *MockTargetManager_PullNextTargetV1_Call {
+	return &MockTargetManager_PullNextTargetV1_Call{Call: _e.mock.On("PullNextTargetV1",
+		append([]interface{}{broker, collectionID}, chosenPartitionIDs...)...)}
+}
+
+func (_c *MockTargetManager_PullNextTargetV1_Call) Run(run func(broker Broker, collectionID int64, chosenPartitionIDs ...int64)) *MockTargetManager_PullNextTargetV1_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		variadicArgs := make([]int64, len(args)-2)
+		for i, a := range args[2:] {
+			if a != nil {
+				variadicArgs[i] = a.(int64)
+			}
+		}
+		run(args[0].(Broker), args[1].(int64), variadicArgs...)
+	})
+	return _c
+}
+
+func (_c *MockTargetManager_PullNextTargetV1_Call) Return(_a0 map[int64]*datapb.SegmentInfo, _a1 map[string]*DmChannel, _a2 error) *MockTargetManager_PullNextTargetV1_Call {
+	_c.Call.Return(_a0, _a1, _a2)
+	return _c
+}
+
+func (_c *MockTargetManager_PullNextTargetV1_Call) RunAndReturn(run func(Broker, int64, ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error)) *MockTargetManager_PullNextTargetV1_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+// PullNextTargetV2 provides a mock function with given fields: broker, collectionID, chosenPartitionIDs
+func (_m *MockTargetManager) PullNextTargetV2(broker Broker, collectionID int64, chosenPartitionIDs ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error) {
+	_va := make([]interface{}, len(chosenPartitionIDs))
+	for _i := range chosenPartitionIDs {
+		_va[_i] = chosenPartitionIDs[_i]
+	}
+	var _ca []interface{}
+	_ca = append(_ca, broker, collectionID)
+	_ca = append(_ca, _va...)
+	ret := _m.Called(_ca...)
+
+	var r0 map[int64]*datapb.SegmentInfo
+	var r1 map[string]*DmChannel
+	var r2 error
+	if rf, ok := ret.Get(0).(func(Broker, int64, ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error)); ok {
+		return rf(broker, collectionID, chosenPartitionIDs...)
+	}
+	if rf, ok := ret.Get(0).(func(Broker, int64, ...int64) map[int64]*datapb.SegmentInfo); ok {
+		r0 = rf(broker, collectionID, chosenPartitionIDs...)
+	} else {
+		if ret.Get(0) != nil {
+			r0 = ret.Get(0).(map[int64]*datapb.SegmentInfo)
+		}
+	}
+
+	if rf, ok := ret.Get(1).(func(Broker, int64, ...int64) map[string]*DmChannel); ok {
+		r1 = rf(broker, collectionID, chosenPartitionIDs...)
+	} else {
+		if ret.Get(1) != nil {
+			r1 = ret.Get(1).(map[string]*DmChannel)
+		}
+	}
+
+	if rf, ok := ret.Get(2).(func(Broker, int64, ...int64) error); ok {
+		r2 = rf(broker, collectionID, chosenPartitionIDs...)
+	} else {
+		r2 = ret.Error(2)
+	}
+
+	return r0, r1, r2
+}
+
+// MockTargetManager_PullNextTargetV2_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'PullNextTargetV2'
+type MockTargetManager_PullNextTargetV2_Call struct {
+	*mock.Call
+}
+
+// PullNextTargetV2 is a helper method to define mock.On call
+//   - broker Broker
+//   - collectionID int64
+//   - chosenPartitionIDs ...int64
+func (_e *MockTargetManager_Expecter) PullNextTargetV2(broker interface{}, collectionID interface{}, chosenPartitionIDs ...interface{}) *MockTargetManager_PullNextTargetV2_Call {
+	return &MockTargetManager_PullNextTargetV2_Call{Call: _e.mock.On("PullNextTargetV2",
+		append([]interface{}{broker, collectionID}, chosenPartitionIDs...)...)}
+}
+
+func (_c *MockTargetManager_PullNextTargetV2_Call) Run(run func(broker Broker, collectionID int64, chosenPartitionIDs ...int64)) *MockTargetManager_PullNextTargetV2_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		variadicArgs := make([]int64, len(args)-2)
+		for i, a := range args[2:] {
+			if a != nil {
+				variadicArgs[i] = a.(int64)
+			}
+		}
+		run(args[0].(Broker), args[1].(int64), variadicArgs...)
+	})
+	return _c
+}
+
+func (_c *MockTargetManager_PullNextTargetV2_Call) Return(_a0 map[int64]*datapb.SegmentInfo, _a1 map[string]*DmChannel, _a2 error) *MockTargetManager_PullNextTargetV2_Call {
+	_c.Call.Return(_a0, _a1, _a2)
+	return _c
+}
+
+func (_c *MockTargetManager_PullNextTargetV2_Call) RunAndReturn(run func(Broker, int64, ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error)) *MockTargetManager_PullNextTargetV2_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+// Recover provides a mock function with given fields: catalog
+func (_m *MockTargetManager) Recover(catalog metastore.QueryCoordCatalog) error {
+	ret := _m.Called(catalog)
+
+	var r0 error
+	if rf, ok := ret.Get(0).(func(metastore.QueryCoordCatalog) error); ok {
+		r0 = rf(catalog)
+	} else {
+		r0 = ret.Error(0)
+	}
+
+	return r0
+}
+
+// MockTargetManager_Recover_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Recover'
+type MockTargetManager_Recover_Call struct {
+	*mock.Call
+}
+
+// Recover is a helper method to define mock.On call
+//   - catalog metastore.QueryCoordCatalog
+func (_e *MockTargetManager_Expecter) Recover(catalog interface{}) *MockTargetManager_Recover_Call {
+	return &MockTargetManager_Recover_Call{Call: _e.mock.On("Recover", catalog)}
+}
+
+func (_c *MockTargetManager_Recover_Call) Run(run func(catalog metastore.QueryCoordCatalog)) *MockTargetManager_Recover_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run(args[0].(metastore.QueryCoordCatalog))
+	})
+	return _c
+}
+
+func (_c *MockTargetManager_Recover_Call) Return(_a0 error) *MockTargetManager_Recover_Call {
+	_c.Call.Return(_a0)
+	return _c
+}
+
+func (_c *MockTargetManager_Recover_Call) RunAndReturn(run func(metastore.QueryCoordCatalog) error) *MockTargetManager_Recover_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+// RemoveCollection provides a mock function with given fields: collectionID
+func (_m *MockTargetManager) RemoveCollection(collectionID int64) {
+	_m.Called(collectionID)
+}
+
+// MockTargetManager_RemoveCollection_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemoveCollection'
+type MockTargetManager_RemoveCollection_Call struct {
+	*mock.Call
+}
+
+// RemoveCollection is a helper method to define mock.On call
+//   - collectionID int64
+func (_e *MockTargetManager_Expecter) RemoveCollection(collectionID interface{}) *MockTargetManager_RemoveCollection_Call {
+	return &MockTargetManager_RemoveCollection_Call{Call: _e.mock.On("RemoveCollection", collectionID)}
+}
+
+func (_c *MockTargetManager_RemoveCollection_Call) Run(run func(collectionID int64)) *MockTargetManager_RemoveCollection_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run(args[0].(int64))
+	})
+	return _c
+}
+
+func (_c *MockTargetManager_RemoveCollection_Call) Return() *MockTargetManager_RemoveCollection_Call {
+	_c.Call.Return()
+	return _c
+}
+
+func (_c *MockTargetManager_RemoveCollection_Call) RunAndReturn(run func(int64)) *MockTargetManager_RemoveCollection_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+// RemovePartition provides a mock function with given fields: collectionID, partitionIDs
+func (_m *MockTargetManager) RemovePartition(collectionID int64, partitionIDs ...int64) {
+	_va := make([]interface{}, len(partitionIDs))
+	for _i := range partitionIDs {
+		_va[_i] = partitionIDs[_i]
+	}
+	var _ca []interface{}
+	_ca = append(_ca, collectionID)
+	_ca = append(_ca, _va...)
+	_m.Called(_ca...)
+}
+
+// MockTargetManager_RemovePartition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemovePartition'
+type MockTargetManager_RemovePartition_Call struct {
+	*mock.Call
+}
+
+// RemovePartition is a helper method to define mock.On call
+//   - collectionID int64
+//   - partitionIDs ...int64
+func (_e *MockTargetManager_Expecter) RemovePartition(collectionID interface{}, partitionIDs ...interface{}) *MockTargetManager_RemovePartition_Call {
+	return &MockTargetManager_RemovePartition_Call{Call: _e.mock.On("RemovePartition",
+		append([]interface{}{collectionID}, partitionIDs...)...)}
+}
+
+func (_c *MockTargetManager_RemovePartition_Call) Run(run func(collectionID int64, partitionIDs ...int64)) *MockTargetManager_RemovePartition_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		variadicArgs := make([]int64, len(args)-1)
+		for i, a := range args[1:] {
+			if a != nil {
+				variadicArgs[i] = a.(int64)
+			}
+		}
+		run(args[0].(int64), variadicArgs...)
+	})
+	return _c
+}
+
+func (_c *MockTargetManager_RemovePartition_Call) Return() *MockTargetManager_RemovePartition_Call {
+	_c.Call.Return()
+	return _c
+}
+
+func (_c *MockTargetManager_RemovePartition_Call) RunAndReturn(run func(int64, ...int64)) *MockTargetManager_RemovePartition_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+// SaveCurrentTarget provides a mock function with given fields: catalog
+func (_m *MockTargetManager) SaveCurrentTarget(catalog metastore.QueryCoordCatalog) {
+	_m.Called(catalog)
+}
+
+// MockTargetManager_SaveCurrentTarget_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SaveCurrentTarget'
+type MockTargetManager_SaveCurrentTarget_Call struct {
+	*mock.Call
+}
+
+// SaveCurrentTarget is a helper method to define mock.On call
+//   - catalog metastore.QueryCoordCatalog
+func (_e *MockTargetManager_Expecter) SaveCurrentTarget(catalog interface{}) *MockTargetManager_SaveCurrentTarget_Call {
+	return &MockTargetManager_SaveCurrentTarget_Call{Call: _e.mock.On("SaveCurrentTarget", catalog)}
+}
+
+func (_c *MockTargetManager_SaveCurrentTarget_Call) Run(run func(catalog metastore.QueryCoordCatalog)) *MockTargetManager_SaveCurrentTarget_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run(args[0].(metastore.QueryCoordCatalog))
+	})
+	return _c
+}
+
+func (_c *MockTargetManager_SaveCurrentTarget_Call) Return() *MockTargetManager_SaveCurrentTarget_Call {
+	_c.Call.Return()
+	return _c
+}
+
+func (_c *MockTargetManager_SaveCurrentTarget_Call) RunAndReturn(run func(metastore.QueryCoordCatalog)) *MockTargetManager_SaveCurrentTarget_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+// UpdateCollectionCurrentTarget provides a mock function with given fields: collectionID
+func (_m *MockTargetManager) UpdateCollectionCurrentTarget(collectionID int64) bool {
+	ret := _m.Called(collectionID)
+
+	var r0 bool
+	if rf, ok := ret.Get(0).(func(int64) bool); ok {
+		r0 = rf(collectionID)
+	} else {
+		r0 = ret.Get(0).(bool)
+	}
+
+	return r0
+}
+
+// MockTargetManager_UpdateCollectionCurrentTarget_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateCollectionCurrentTarget'
+type MockTargetManager_UpdateCollectionCurrentTarget_Call struct {
+	*mock.Call
+}
+
+// UpdateCollectionCurrentTarget is a helper method to define mock.On call
+//   - collectionID int64
+func (_e *MockTargetManager_Expecter) UpdateCollectionCurrentTarget(collectionID interface{}) *MockTargetManager_UpdateCollectionCurrentTarget_Call {
+	return &MockTargetManager_UpdateCollectionCurrentTarget_Call{Call: _e.mock.On("UpdateCollectionCurrentTarget", collectionID)}
+}
+
+func (_c *MockTargetManager_UpdateCollectionCurrentTarget_Call) Run(run func(collectionID int64)) *MockTargetManager_UpdateCollectionCurrentTarget_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run(args[0].(int64))
+	})
+	return _c
+}
+
+func (_c *MockTargetManager_UpdateCollectionCurrentTarget_Call) Return(_a0 bool) *MockTargetManager_UpdateCollectionCurrentTarget_Call {
+	_c.Call.Return(_a0)
+	return _c
+}
+
+func (_c *MockTargetManager_UpdateCollectionCurrentTarget_Call) RunAndReturn(run func(int64) bool) *MockTargetManager_UpdateCollectionCurrentTarget_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+// UpdateCollectionNextTarget provides a mock function with given fields: collectionID
+func (_m *MockTargetManager) UpdateCollectionNextTarget(collectionID int64) error {
+	ret := _m.Called(collectionID)
+
+	var r0 error
+	if rf, ok := ret.Get(0).(func(int64) error); ok {
+		r0 = rf(collectionID)
+	} else {
+		r0 = ret.Error(0)
+	}
+
+	return r0
+}
+
+// MockTargetManager_UpdateCollectionNextTarget_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateCollectionNextTarget'
+type MockTargetManager_UpdateCollectionNextTarget_Call struct {
+	*mock.Call
+}
+
+// UpdateCollectionNextTarget is a helper method to define mock.On call
+//   - collectionID int64
+func (_e *MockTargetManager_Expecter) UpdateCollectionNextTarget(collectionID interface{}) *MockTargetManager_UpdateCollectionNextTarget_Call {
+	return &MockTargetManager_UpdateCollectionNextTarget_Call{Call: _e.mock.On("UpdateCollectionNextTarget", collectionID)}
+}
+
+func (_c *MockTargetManager_UpdateCollectionNextTarget_Call) Run(run func(collectionID int64)) *MockTargetManager_UpdateCollectionNextTarget_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run(args[0].(int64))
+	})
+	return _c
+}
+
+func (_c *MockTargetManager_UpdateCollectionNextTarget_Call) Return(_a0 error) *MockTargetManager_UpdateCollectionNextTarget_Call {
+	_c.Call.Return(_a0)
+	return _c
+}
+
+func (_c *MockTargetManager_UpdateCollectionNextTarget_Call) RunAndReturn(run func(int64) error) *MockTargetManager_UpdateCollectionNextTarget_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+// NewMockTargetManager creates a new instance of MockTargetManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
+// The first argument is typically a *testing.T value.
+func NewMockTargetManager(t interface {
+	mock.TestingT
+	Cleanup(func())
+}) *MockTargetManager {
+	mock := &MockTargetManager{}
+	mock.Mock.Test(t)
+
+	t.Cleanup(func() { mock.AssertExpectations(t) })
+
+	return mock
+}
diff --git a/internal/querycoordv2/meta/target_manager.go b/internal/querycoordv2/meta/target_manager.go
index 6808e6ca9a..376fd37569 100644
--- a/internal/querycoordv2/meta/target_manager.go
+++ b/internal/querycoordv2/meta/target_manager.go
@@ -50,6 +50,29 @@ const (
 	NextTargetFirst
 )
 
+type TargetManagerInterface interface {
+	UpdateCollectionCurrentTarget(collectionID int64) bool
+	UpdateCollectionNextTarget(collectionID int64) error
+	PullNextTargetV1(broker Broker, collectionID int64, chosenPartitionIDs ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error)
+	PullNextTargetV2(broker Broker, collectionID int64, chosenPartitionIDs ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error)
+	RemoveCollection(collectionID int64)
+	RemovePartition(collectionID int64, partitionIDs ...int64)
+	GetGrowingSegmentsByCollection(collectionID int64, scope TargetScope) typeutil.UniqueSet
+	GetGrowingSegmentsByChannel(collectionID int64, channelName string, scope TargetScope) typeutil.UniqueSet
+	GetSealedSegmentsByCollection(collectionID int64, scope TargetScope) map[int64]*datapb.SegmentInfo
+	GetSealedSegmentsByChannel(collectionID int64, channelName string, scope TargetScope) map[int64]*datapb.SegmentInfo
+	GetDroppedSegmentsByChannel(collectionID int64, channelName string, scope TargetScope) []int64
+	GetSealedSegmentsByPartition(collectionID int64, partitionID int64, scope TargetScope) map[int64]*datapb.SegmentInfo
+	GetDmChannelsByCollection(collectionID int64, scope TargetScope) map[string]*DmChannel
+	GetDmChannel(collectionID int64, channel string, scope TargetScope) *DmChannel
+	GetSealedSegment(collectionID int64, id int64, scope TargetScope) *datapb.SegmentInfo
+	GetCollectionTargetVersion(collectionID int64, scope TargetScope) int64
+	IsCurrentTargetExist(collectionID int64) bool
+	IsNextTargetExist(collectionID int64) bool
+	SaveCurrentTarget(catalog metastore.QueryCoordCatalog)
+	Recover(catalog metastore.QueryCoordCatalog) error
+}
+
 type TargetManager struct {
 	rwMutex sync.RWMutex
 	broker  Broker
diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go
index 1f26770f46..326da3ef8d 100644
--- a/internal/querynodev2/server.go
+++ b/internal/querynodev2/server.go
@@ -67,6 +67,7 @@ import (
 	"github.com/milvus-io/milvus/pkg/util/gc"
 	"github.com/milvus-io/milvus/pkg/util/hardware"
 	"github.com/milvus-io/milvus/pkg/util/lifetime"
+	"github.com/milvus-io/milvus/pkg/util/lock"
 	"github.com/milvus-io/milvus/pkg/util/paramtable"
 	"github.com/milvus-io/milvus/pkg/util/typeutil"
 )
@@ -130,6 +131,10 @@ type QueryNode struct {
 
 	// parameter turning hook
 	queryHook optimizers.QueryHook
+
+	// record the last modify ts of segment/channel distribution
+	lastModifyLock lock.RWMutex
+	lastModifyTs   int64
 }
 
 // NewQueryNode will return a QueryNode with abnormal state.
diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go
index c5b58cca05..d1e5b8a5d4 100644
--- a/internal/querynodev2/services.go
+++ b/internal/querynodev2/services.go
@@ -21,6 +21,7 @@ import (
 	"fmt"
 	"strconv"
 	"sync"
+	"time"
 
 	"github.com/golang/protobuf/proto"
 	"github.com/samber/lo"
@@ -190,6 +191,8 @@ func (node *QueryNode) composeIndexMeta(indexInfos []*indexpb.IndexInfo, schema
 
 // WatchDmChannels create consumers on dmChannels to receive Incremental data,which is the important part of real-time query
 func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDmChannelsRequest) (status *commonpb.Status, e error) {
+	defer node.updateDistributionModifyTS()
+
 	channel := req.GetInfos()[0]
 	log := log.Ctx(ctx).With(
 		zap.Int64("collectionID", req.GetCollectionID()),
@@ -339,6 +342,7 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm
 }
 
 func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmChannelRequest) (*commonpb.Status, error) {
+	defer node.updateDistributionModifyTS()
 	log := log.Ctx(ctx).With(
 		zap.Int64("collectionID", req.GetCollectionID()),
 		zap.String("channel", req.GetChannelName()),
@@ -396,6 +400,7 @@ func (node *QueryNode) LoadPartitions(ctx context.Context, req *querypb.LoadPart
 
 // LoadSegments load historical data into query node, historical data can be vector data or index
 func (node *QueryNode) LoadSegments(ctx context.Context, req *querypb.LoadSegmentsRequest) (*commonpb.Status, error) {
+	defer node.updateDistributionModifyTS()
 	segment := req.GetInfos()[0]
 
 	log := log.Ctx(ctx).With(
@@ -527,6 +532,7 @@ func (node *QueryNode) ReleasePartitions(ctx context.Context, req *querypb.Relea
 
 // ReleaseSegments remove the specified segments from query node according segmentIDs, partitionIDs, and collectionID
 func (node *QueryNode) ReleaseSegments(ctx context.Context, req *querypb.ReleaseSegmentsRequest) (*commonpb.Status, error) {
+	defer node.updateDistributionModifyTS()
 	log := log.Ctx(ctx).With(
 		zap.Int64("collectionID", req.GetCollectionID()),
 		zap.String("shard", req.GetShard()),
@@ -1174,6 +1180,23 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get
 	}
 	defer node.lifetime.Done()
 
+	lastModifyTs := node.getDistributionModifyTS()
+	distributionChange := func() bool {
+		if req.GetLastUpdateTs() == 0 {
+			return true
+		}
+
+		return req.GetLastUpdateTs() < lastModifyTs
+	}
+
+	if !distributionChange() {
+		return &querypb.GetDataDistributionResponse{
+			Status:       merr.Success(),
+			NodeID:       node.GetNodeID(),
+			LastModifyTs: lastModifyTs,
+		}, nil
+	}
+
 	sealedSegments := node.manager.Segment.GetBy(segments.WithType(commonpb.SegmentState_Sealed))
 	segmentVersionInfos := make([]*querypb.SegmentVersionInfo, 0, len(sealedSegments))
 	for _, s := range sealedSegments {
@@ -1239,15 +1262,18 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get
 	})
 
 	return &querypb.GetDataDistributionResponse{
-		Status:      merr.Success(),
-		NodeID:      node.GetNodeID(),
-		Segments:    segmentVersionInfos,
-		Channels:    channelVersionInfos,
-		LeaderViews: leaderViews,
+		Status:       merr.Success(),
+		NodeID:       node.GetNodeID(),
+		Segments:     segmentVersionInfos,
+		Channels:     channelVersionInfos,
+		LeaderViews:  leaderViews,
+		LastModifyTs: lastModifyTs,
 	}, nil
 }
 
 func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDistributionRequest) (*commonpb.Status, error) {
+	defer node.updateDistributionModifyTS()
+
 	log := log.Ctx(ctx).With(zap.Int64("collectionID", req.GetCollectionID()),
 		zap.String("channel", req.GetChannel()), zap.Int64("currentNodeID", node.GetNodeID()))
 	// check node healthy
@@ -1402,3 +1428,16 @@ func (req *deleteRequestStringer) String() string {
 	tss := req.GetTimestamps()
 	return fmt.Sprintf("%s, timestamp range: [%d-%d]", pkInfo, tss[0], tss[len(tss)-1])
 }
+
+func (node *QueryNode) updateDistributionModifyTS() {
+	node.lastModifyLock.Lock()
+	defer node.lastModifyLock.Unlock()
+
+	node.lastModifyTs = time.Now().UnixNano()
+}
+
+func (node *QueryNode) getDistributionModifyTS() int64 {
+	node.lastModifyLock.RLock()
+	defer node.lastModifyLock.RUnlock()
+	return node.lastModifyTs
+}