enhance: Skip return data distribution if no change happen (#32814)

issue: #32813

---------

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/33111/head
wei liu 2024-05-17 10:11:37 +08:00 committed by GitHub
parent 5b27a0cd2c
commit f1c9986974
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 1231 additions and 41 deletions

View File

@ -428,6 +428,7 @@ generate-mockery-proxy: getdeps
generate-mockery-querycoord: getdeps
$(INSTALL_PATH)/mockery --name=QueryNodeServer --dir=$(PWD)/internal/proto/querypb/ --output=$(PWD)/internal/querycoordv2/mocks --filename=mock_querynode.go --with-expecter --structname=MockQueryNodeServer
$(INSTALL_PATH)/mockery --name=Broker --dir=$(PWD)/internal/querycoordv2/meta --output=$(PWD)/internal/querycoordv2/meta --filename=mock_broker.go --with-expecter --structname=MockBroker --outpkg=meta
$(INSTALL_PATH)/mockery --name=TargetManagerInterface --dir=$(PWD)/internal/querycoordv2/meta --output=$(PWD)/internal/querycoordv2/meta --filename=mock_target_manager.go --with-expecter --structname=MockTargetManager --inpackage
$(INSTALL_PATH)/mockery --name=Scheduler --dir=$(PWD)/internal/querycoordv2/task --output=$(PWD)/internal/querycoordv2/task --filename=mock_scheduler.go --with-expecter --structname=MockScheduler --outpkg=task --inpackage
$(INSTALL_PATH)/mockery --name=Cluster --dir=$(PWD)/internal/querycoordv2/session --output=$(PWD)/internal/querycoordv2/session --filename=mock_cluster.go --with-expecter --structname=MockCluster --outpkg=session --inpackage
$(INSTALL_PATH)/mockery --name=Balance --dir=$(PWD)/internal/querycoordv2/balance --output=$(PWD)/internal/querycoordv2/balance --filename=mock_balancer.go --with-expecter --structname=MockBalancer --outpkg=balance --inpackage

View File

@ -589,6 +589,7 @@ message SealedSegmentsChangeInfo {
message GetDataDistributionRequest {
common.MsgBase base = 1;
map<string, msg.MsgPosition> checkpoints = 2;
int64 lastUpdateTs = 3;
}
message GetDataDistributionResponse {
@ -597,6 +598,7 @@ message GetDataDistributionResponse {
repeated SegmentVersionInfo segments = 3;
repeated ChannelVersionInfo channels = 4;
repeated LeaderView leader_views = 5;
int64 lastModifyTs = 6;
}
message LeaderView {

View File

@ -48,6 +48,8 @@ type DistControllerTestSuite struct {
kv kv.MetaKv
meta *meta.Meta
broker *meta.MockBroker
nodeMgr *session.NodeManager
}
func (suite *DistControllerTestSuite) SetupTest() {
@ -69,15 +71,16 @@ func (suite *DistControllerTestSuite) SetupTest() {
// meta
store := querycoord.NewCatalog(suite.kv)
idAllocator := RandomIncrementIDAllocator()
suite.meta = meta.NewMeta(idAllocator, store, session.NewNodeManager())
suite.nodeMgr = session.NewNodeManager()
suite.meta = meta.NewMeta(idAllocator, store, suite.nodeMgr)
suite.mockCluster = session.NewMockCluster(suite.T())
nodeManager := session.NewNodeManager()
distManager := meta.NewDistributionManager()
suite.broker = meta.NewMockBroker(suite.T())
targetManager := meta.NewTargetManager(suite.broker, suite.meta)
suite.mockScheduler = task.NewMockScheduler(suite.T())
suite.controller = NewDistController(suite.mockCluster, nodeManager, distManager, targetManager, suite.mockScheduler)
suite.controller = NewDistController(suite.mockCluster, suite.nodeMgr, distManager, targetManager, suite.mockScheduler)
}
func (suite *DistControllerTestSuite) TearDownSuite() {
@ -85,6 +88,11 @@ func (suite *DistControllerTestSuite) TearDownSuite() {
}
func (suite *DistControllerTestSuite) TestStart() {
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Address: "localhost",
Hostname: "localhost",
}))
dispatchCalled := atomic.NewBool(false)
suite.mockCluster.EXPECT().GetDataDistribution(mock.Anything, mock.Anything, mock.Anything).Return(
&querypb.GetDataDistributionResponse{Status: merr.Success(), NodeID: 1},
@ -133,6 +141,17 @@ func (suite *DistControllerTestSuite) TestStop() {
}
func (suite *DistControllerTestSuite) TestSyncAll() {
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
Address: "localhost",
Hostname: "localhost",
}))
suite.controller.StartDistInstance(context.TODO(), 1)
suite.controller.StartDistInstance(context.TODO(), 2)

View File

@ -26,7 +26,6 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
@ -40,16 +39,17 @@ import (
)
type distHandler struct {
nodeID int64
c chan struct{}
wg sync.WaitGroup
client session.Cluster
nodeManager *session.NodeManager
scheduler task.Scheduler
dist *meta.DistributionManager
target *meta.TargetManager
mu sync.Mutex
stopOnce sync.Once
nodeID int64
c chan struct{}
wg sync.WaitGroup
client session.Cluster
nodeManager *session.NodeManager
scheduler task.Scheduler
dist *meta.DistributionManager
target meta.TargetManagerInterface
mu sync.Mutex
stopOnce sync.Once
lastUpdateTs int64
}
func (dh *distHandler) start(ctx context.Context) {
@ -87,21 +87,31 @@ func (dh *distHandler) start(ctx context.Context) {
func (dh *distHandler) handleDistResp(resp *querypb.GetDataDistributionResponse) {
node := dh.nodeManager.Get(resp.GetNodeID())
if node != nil {
if node == nil {
return
}
if time.Since(node.LastHeartbeat()) > paramtable.Get().QueryCoordCfg.HeartBeatWarningLag.GetAsDuration(time.Millisecond) {
log.Warn("node last heart beat time lag too behind", zap.Time("now", time.Now()),
zap.Time("lastHeartBeatTime", node.LastHeartbeat()), zap.Int64("nodeID", node.ID()))
}
node.SetLastHeartbeat(time.Now())
// skip update dist if no distribution change happens in query node
if resp.GetLastModifyTs() <= dh.lastUpdateTs {
log.RatedInfo(30, "skip update dist due to no distribution change", zap.Int64("lastModifyTs", resp.GetLastModifyTs()), zap.Int64("lastUpdateTs", dh.lastUpdateTs))
} else {
dh.lastUpdateTs = resp.GetLastModifyTs()
node.UpdateStats(
session.WithSegmentCnt(len(resp.GetSegments())),
session.WithChannelCnt(len(resp.GetChannels())),
)
if time.Since(node.LastHeartbeat()) > paramtable.Get().QueryCoordCfg.HeartBeatWarningLag.GetAsDuration(time.Millisecond) {
log.Warn("node last heart beat time lag too behind", zap.Time("now", time.Now()),
zap.Time("lastHeartBeatTime", node.LastHeartbeat()), zap.Int64("nodeID", node.ID()))
}
node.SetLastHeartbeat(time.Now())
}
dh.updateSegmentsDistribution(resp)
dh.updateChannelsDistribution(resp)
dh.updateLeaderView(resp)
dh.updateSegmentsDistribution(resp)
dh.updateChannelsDistribution(resp)
dh.updateLeaderView(resp)
}
dh.scheduler.Dispatch(dh.nodeID)
}
@ -214,23 +224,13 @@ func (dh *distHandler) getDistribution(ctx context.Context) (*querypb.GetDataDis
dh.mu.Lock()
defer dh.mu.Unlock()
channels := make(map[string]*msgpb.MsgPosition)
for _, channel := range dh.dist.ChannelDistManager.GetByFilter(meta.WithNodeID2Channel(dh.nodeID)) {
targetChannel := dh.target.GetDmChannel(channel.GetCollectionID(), channel.GetChannelName(), meta.CurrentTarget)
if targetChannel == nil {
continue
}
channels[channel.GetChannelName()] = targetChannel.GetSeekPosition()
}
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.DistributionRequestTimeout.GetAsDuration(time.Millisecond))
defer cancel()
resp, err := dh.client.GetDataDistribution(ctx, dh.nodeID, &querypb.GetDataDistributionRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_GetDistribution),
),
Checkpoints: channels,
LastUpdateTs: dh.lastUpdateTs,
})
if err != nil {
return nil, err
@ -259,7 +259,7 @@ func newDistHandler(
nodeManager *session.NodeManager,
scheduler task.Scheduler,
dist *meta.DistributionManager,
targetMgr *meta.TargetManager,
targetMgr meta.TargetManagerInterface,
) *distHandler {
h := &distHandler{
nodeID: nodeID,

View File

@ -0,0 +1,126 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dist
import (
"context"
"testing"
"time"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
type DistHandlerSuite struct {
suite.Suite
ctx context.Context
meta *meta.Meta
broker *meta.MockBroker
nodeID int64
client *session.MockCluster
nodeManager *session.NodeManager
scheduler *task.MockScheduler
dist *meta.DistributionManager
target *meta.MockTargetManager
handler *distHandler
}
func (suite *DistHandlerSuite) SetupSuite() {
paramtable.Init()
suite.nodeID = 1
suite.client = session.NewMockCluster(suite.T())
suite.nodeManager = session.NewNodeManager()
suite.scheduler = task.NewMockScheduler(suite.T())
suite.dist = meta.NewDistributionManager()
suite.target = meta.NewMockTargetManager(suite.T())
suite.ctx = context.Background()
suite.scheduler.EXPECT().Dispatch(mock.Anything).Maybe()
suite.target.EXPECT().GetSealedSegment(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
suite.target.EXPECT().GetDmChannel(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
}
func (suite *DistHandlerSuite) TestBasic() {
suite.nodeManager.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Address: "localhost",
Hostname: "localhost",
}))
suite.client.EXPECT().GetDataDistribution(mock.Anything, mock.Anything, mock.Anything).Return(&querypb.GetDataDistributionResponse{
Status: merr.Success(),
NodeID: 1,
Channels: []*querypb.ChannelVersionInfo{
{
Channel: "test-channel-1",
Collection: 1,
Version: 1,
},
},
Segments: []*querypb.SegmentVersionInfo{
{
ID: 1,
Collection: 1,
Partition: 1,
Channel: "test-channel-1",
Version: 1,
},
},
LeaderViews: []*querypb.LeaderView{
{
Collection: 1,
Channel: "test-channel-1",
},
},
LastModifyTs: 1,
}, nil)
suite.handler = newDistHandler(suite.ctx, suite.nodeID, suite.client, suite.nodeManager, suite.scheduler, suite.dist, suite.target)
defer suite.handler.stop()
time.Sleep(10 * time.Second)
}
func (suite *DistHandlerSuite) TestGetDistributionFailed() {
suite.nodeManager.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Address: "localhost",
Hostname: "localhost",
}))
suite.client.EXPECT().GetDataDistribution(mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("fake error"))
suite.handler = newDistHandler(suite.ctx, suite.nodeID, suite.client, suite.nodeManager, suite.scheduler, suite.dist, suite.target)
defer suite.handler.stop()
time.Sleep(10 * time.Second)
}
func TestDistHandlerSuite(t *testing.T) {
suite.Run(t, new(DistHandlerSuite))
}

View File

@ -0,0 +1,975 @@
// Code generated by mockery v2.32.4. DO NOT EDIT.
package meta
import (
metastore "github.com/milvus-io/milvus/internal/metastore"
datapb "github.com/milvus-io/milvus/internal/proto/datapb"
mock "github.com/stretchr/testify/mock"
typeutil "github.com/milvus-io/milvus/pkg/util/typeutil"
)
// MockTargetManager is an autogenerated mock type for the TargetManagerInterface type
type MockTargetManager struct {
mock.Mock
}
type MockTargetManager_Expecter struct {
mock *mock.Mock
}
func (_m *MockTargetManager) EXPECT() *MockTargetManager_Expecter {
return &MockTargetManager_Expecter{mock: &_m.Mock}
}
// GetCollectionTargetVersion provides a mock function with given fields: collectionID, scope
func (_m *MockTargetManager) GetCollectionTargetVersion(collectionID int64, scope int32) int64 {
ret := _m.Called(collectionID, scope)
var r0 int64
if rf, ok := ret.Get(0).(func(int64, int32) int64); ok {
r0 = rf(collectionID, scope)
} else {
r0 = ret.Get(0).(int64)
}
return r0
}
// MockTargetManager_GetCollectionTargetVersion_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetCollectionTargetVersion'
type MockTargetManager_GetCollectionTargetVersion_Call struct {
*mock.Call
}
// GetCollectionTargetVersion is a helper method to define mock.On call
// - collectionID int64
// - scope int32
func (_e *MockTargetManager_Expecter) GetCollectionTargetVersion(collectionID interface{}, scope interface{}) *MockTargetManager_GetCollectionTargetVersion_Call {
return &MockTargetManager_GetCollectionTargetVersion_Call{Call: _e.mock.On("GetCollectionTargetVersion", collectionID, scope)}
}
func (_c *MockTargetManager_GetCollectionTargetVersion_Call) Run(run func(collectionID int64, scope int32)) *MockTargetManager_GetCollectionTargetVersion_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64), args[1].(int32))
})
return _c
}
func (_c *MockTargetManager_GetCollectionTargetVersion_Call) Return(_a0 int64) *MockTargetManager_GetCollectionTargetVersion_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockTargetManager_GetCollectionTargetVersion_Call) RunAndReturn(run func(int64, int32) int64) *MockTargetManager_GetCollectionTargetVersion_Call {
_c.Call.Return(run)
return _c
}
// GetDmChannel provides a mock function with given fields: collectionID, channel, scope
func (_m *MockTargetManager) GetDmChannel(collectionID int64, channel string, scope int32) *DmChannel {
ret := _m.Called(collectionID, channel, scope)
var r0 *DmChannel
if rf, ok := ret.Get(0).(func(int64, string, int32) *DmChannel); ok {
r0 = rf(collectionID, channel, scope)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*DmChannel)
}
}
return r0
}
// MockTargetManager_GetDmChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetDmChannel'
type MockTargetManager_GetDmChannel_Call struct {
*mock.Call
}
// GetDmChannel is a helper method to define mock.On call
// - collectionID int64
// - channel string
// - scope int32
func (_e *MockTargetManager_Expecter) GetDmChannel(collectionID interface{}, channel interface{}, scope interface{}) *MockTargetManager_GetDmChannel_Call {
return &MockTargetManager_GetDmChannel_Call{Call: _e.mock.On("GetDmChannel", collectionID, channel, scope)}
}
func (_c *MockTargetManager_GetDmChannel_Call) Run(run func(collectionID int64, channel string, scope int32)) *MockTargetManager_GetDmChannel_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64), args[1].(string), args[2].(int32))
})
return _c
}
func (_c *MockTargetManager_GetDmChannel_Call) Return(_a0 *DmChannel) *MockTargetManager_GetDmChannel_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockTargetManager_GetDmChannel_Call) RunAndReturn(run func(int64, string, int32) *DmChannel) *MockTargetManager_GetDmChannel_Call {
_c.Call.Return(run)
return _c
}
// GetDmChannelsByCollection provides a mock function with given fields: collectionID, scope
func (_m *MockTargetManager) GetDmChannelsByCollection(collectionID int64, scope int32) map[string]*DmChannel {
ret := _m.Called(collectionID, scope)
var r0 map[string]*DmChannel
if rf, ok := ret.Get(0).(func(int64, int32) map[string]*DmChannel); ok {
r0 = rf(collectionID, scope)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(map[string]*DmChannel)
}
}
return r0
}
// MockTargetManager_GetDmChannelsByCollection_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetDmChannelsByCollection'
type MockTargetManager_GetDmChannelsByCollection_Call struct {
*mock.Call
}
// GetDmChannelsByCollection is a helper method to define mock.On call
// - collectionID int64
// - scope int32
func (_e *MockTargetManager_Expecter) GetDmChannelsByCollection(collectionID interface{}, scope interface{}) *MockTargetManager_GetDmChannelsByCollection_Call {
return &MockTargetManager_GetDmChannelsByCollection_Call{Call: _e.mock.On("GetDmChannelsByCollection", collectionID, scope)}
}
func (_c *MockTargetManager_GetDmChannelsByCollection_Call) Run(run func(collectionID int64, scope int32)) *MockTargetManager_GetDmChannelsByCollection_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64), args[1].(int32))
})
return _c
}
func (_c *MockTargetManager_GetDmChannelsByCollection_Call) Return(_a0 map[string]*DmChannel) *MockTargetManager_GetDmChannelsByCollection_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockTargetManager_GetDmChannelsByCollection_Call) RunAndReturn(run func(int64, int32) map[string]*DmChannel) *MockTargetManager_GetDmChannelsByCollection_Call {
_c.Call.Return(run)
return _c
}
// GetDroppedSegmentsByChannel provides a mock function with given fields: collectionID, channelName, scope
func (_m *MockTargetManager) GetDroppedSegmentsByChannel(collectionID int64, channelName string, scope int32) []int64 {
ret := _m.Called(collectionID, channelName, scope)
var r0 []int64
if rf, ok := ret.Get(0).(func(int64, string, int32) []int64); ok {
r0 = rf(collectionID, channelName, scope)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]int64)
}
}
return r0
}
// MockTargetManager_GetDroppedSegmentsByChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetDroppedSegmentsByChannel'
type MockTargetManager_GetDroppedSegmentsByChannel_Call struct {
*mock.Call
}
// GetDroppedSegmentsByChannel is a helper method to define mock.On call
// - collectionID int64
// - channelName string
// - scope int32
func (_e *MockTargetManager_Expecter) GetDroppedSegmentsByChannel(collectionID interface{}, channelName interface{}, scope interface{}) *MockTargetManager_GetDroppedSegmentsByChannel_Call {
return &MockTargetManager_GetDroppedSegmentsByChannel_Call{Call: _e.mock.On("GetDroppedSegmentsByChannel", collectionID, channelName, scope)}
}
func (_c *MockTargetManager_GetDroppedSegmentsByChannel_Call) Run(run func(collectionID int64, channelName string, scope int32)) *MockTargetManager_GetDroppedSegmentsByChannel_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64), args[1].(string), args[2].(int32))
})
return _c
}
func (_c *MockTargetManager_GetDroppedSegmentsByChannel_Call) Return(_a0 []int64) *MockTargetManager_GetDroppedSegmentsByChannel_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockTargetManager_GetDroppedSegmentsByChannel_Call) RunAndReturn(run func(int64, string, int32) []int64) *MockTargetManager_GetDroppedSegmentsByChannel_Call {
_c.Call.Return(run)
return _c
}
// GetGrowingSegmentsByChannel provides a mock function with given fields: collectionID, channelName, scope
func (_m *MockTargetManager) GetGrowingSegmentsByChannel(collectionID int64, channelName string, scope int32) typeutil.Set[int64] {
ret := _m.Called(collectionID, channelName, scope)
var r0 typeutil.Set[int64]
if rf, ok := ret.Get(0).(func(int64, string, int32) typeutil.Set[int64]); ok {
r0 = rf(collectionID, channelName, scope)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(typeutil.Set[int64])
}
}
return r0
}
// MockTargetManager_GetGrowingSegmentsByChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetGrowingSegmentsByChannel'
type MockTargetManager_GetGrowingSegmentsByChannel_Call struct {
*mock.Call
}
// GetGrowingSegmentsByChannel is a helper method to define mock.On call
// - collectionID int64
// - channelName string
// - scope int32
func (_e *MockTargetManager_Expecter) GetGrowingSegmentsByChannel(collectionID interface{}, channelName interface{}, scope interface{}) *MockTargetManager_GetGrowingSegmentsByChannel_Call {
return &MockTargetManager_GetGrowingSegmentsByChannel_Call{Call: _e.mock.On("GetGrowingSegmentsByChannel", collectionID, channelName, scope)}
}
func (_c *MockTargetManager_GetGrowingSegmentsByChannel_Call) Run(run func(collectionID int64, channelName string, scope int32)) *MockTargetManager_GetGrowingSegmentsByChannel_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64), args[1].(string), args[2].(int32))
})
return _c
}
func (_c *MockTargetManager_GetGrowingSegmentsByChannel_Call) Return(_a0 typeutil.Set[int64]) *MockTargetManager_GetGrowingSegmentsByChannel_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockTargetManager_GetGrowingSegmentsByChannel_Call) RunAndReturn(run func(int64, string, int32) typeutil.Set[int64]) *MockTargetManager_GetGrowingSegmentsByChannel_Call {
_c.Call.Return(run)
return _c
}
// GetGrowingSegmentsByCollection provides a mock function with given fields: collectionID, scope
func (_m *MockTargetManager) GetGrowingSegmentsByCollection(collectionID int64, scope int32) typeutil.Set[int64] {
ret := _m.Called(collectionID, scope)
var r0 typeutil.Set[int64]
if rf, ok := ret.Get(0).(func(int64, int32) typeutil.Set[int64]); ok {
r0 = rf(collectionID, scope)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(typeutil.Set[int64])
}
}
return r0
}
// MockTargetManager_GetGrowingSegmentsByCollection_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetGrowingSegmentsByCollection'
type MockTargetManager_GetGrowingSegmentsByCollection_Call struct {
*mock.Call
}
// GetGrowingSegmentsByCollection is a helper method to define mock.On call
// - collectionID int64
// - scope int32
func (_e *MockTargetManager_Expecter) GetGrowingSegmentsByCollection(collectionID interface{}, scope interface{}) *MockTargetManager_GetGrowingSegmentsByCollection_Call {
return &MockTargetManager_GetGrowingSegmentsByCollection_Call{Call: _e.mock.On("GetGrowingSegmentsByCollection", collectionID, scope)}
}
func (_c *MockTargetManager_GetGrowingSegmentsByCollection_Call) Run(run func(collectionID int64, scope int32)) *MockTargetManager_GetGrowingSegmentsByCollection_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64), args[1].(int32))
})
return _c
}
func (_c *MockTargetManager_GetGrowingSegmentsByCollection_Call) Return(_a0 typeutil.Set[int64]) *MockTargetManager_GetGrowingSegmentsByCollection_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockTargetManager_GetGrowingSegmentsByCollection_Call) RunAndReturn(run func(int64, int32) typeutil.Set[int64]) *MockTargetManager_GetGrowingSegmentsByCollection_Call {
_c.Call.Return(run)
return _c
}
// GetSealedSegment provides a mock function with given fields: collectionID, id, scope
func (_m *MockTargetManager) GetSealedSegment(collectionID int64, id int64, scope int32) *datapb.SegmentInfo {
ret := _m.Called(collectionID, id, scope)
var r0 *datapb.SegmentInfo
if rf, ok := ret.Get(0).(func(int64, int64, int32) *datapb.SegmentInfo); ok {
r0 = rf(collectionID, id, scope)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*datapb.SegmentInfo)
}
}
return r0
}
// MockTargetManager_GetSealedSegment_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSealedSegment'
type MockTargetManager_GetSealedSegment_Call struct {
*mock.Call
}
// GetSealedSegment is a helper method to define mock.On call
// - collectionID int64
// - id int64
// - scope int32
func (_e *MockTargetManager_Expecter) GetSealedSegment(collectionID interface{}, id interface{}, scope interface{}) *MockTargetManager_GetSealedSegment_Call {
return &MockTargetManager_GetSealedSegment_Call{Call: _e.mock.On("GetSealedSegment", collectionID, id, scope)}
}
func (_c *MockTargetManager_GetSealedSegment_Call) Run(run func(collectionID int64, id int64, scope int32)) *MockTargetManager_GetSealedSegment_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64), args[1].(int64), args[2].(int32))
})
return _c
}
func (_c *MockTargetManager_GetSealedSegment_Call) Return(_a0 *datapb.SegmentInfo) *MockTargetManager_GetSealedSegment_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockTargetManager_GetSealedSegment_Call) RunAndReturn(run func(int64, int64, int32) *datapb.SegmentInfo) *MockTargetManager_GetSealedSegment_Call {
_c.Call.Return(run)
return _c
}
// GetSealedSegmentsByChannel provides a mock function with given fields: collectionID, channelName, scope
func (_m *MockTargetManager) GetSealedSegmentsByChannel(collectionID int64, channelName string, scope int32) map[int64]*datapb.SegmentInfo {
ret := _m.Called(collectionID, channelName, scope)
var r0 map[int64]*datapb.SegmentInfo
if rf, ok := ret.Get(0).(func(int64, string, int32) map[int64]*datapb.SegmentInfo); ok {
r0 = rf(collectionID, channelName, scope)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(map[int64]*datapb.SegmentInfo)
}
}
return r0
}
// MockTargetManager_GetSealedSegmentsByChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSealedSegmentsByChannel'
type MockTargetManager_GetSealedSegmentsByChannel_Call struct {
*mock.Call
}
// GetSealedSegmentsByChannel is a helper method to define mock.On call
// - collectionID int64
// - channelName string
// - scope int32
func (_e *MockTargetManager_Expecter) GetSealedSegmentsByChannel(collectionID interface{}, channelName interface{}, scope interface{}) *MockTargetManager_GetSealedSegmentsByChannel_Call {
return &MockTargetManager_GetSealedSegmentsByChannel_Call{Call: _e.mock.On("GetSealedSegmentsByChannel", collectionID, channelName, scope)}
}
func (_c *MockTargetManager_GetSealedSegmentsByChannel_Call) Run(run func(collectionID int64, channelName string, scope int32)) *MockTargetManager_GetSealedSegmentsByChannel_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64), args[1].(string), args[2].(int32))
})
return _c
}
func (_c *MockTargetManager_GetSealedSegmentsByChannel_Call) Return(_a0 map[int64]*datapb.SegmentInfo) *MockTargetManager_GetSealedSegmentsByChannel_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockTargetManager_GetSealedSegmentsByChannel_Call) RunAndReturn(run func(int64, string, int32) map[int64]*datapb.SegmentInfo) *MockTargetManager_GetSealedSegmentsByChannel_Call {
_c.Call.Return(run)
return _c
}
// GetSealedSegmentsByCollection provides a mock function with given fields: collectionID, scope
func (_m *MockTargetManager) GetSealedSegmentsByCollection(collectionID int64, scope int32) map[int64]*datapb.SegmentInfo {
ret := _m.Called(collectionID, scope)
var r0 map[int64]*datapb.SegmentInfo
if rf, ok := ret.Get(0).(func(int64, int32) map[int64]*datapb.SegmentInfo); ok {
r0 = rf(collectionID, scope)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(map[int64]*datapb.SegmentInfo)
}
}
return r0
}
// MockTargetManager_GetSealedSegmentsByCollection_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSealedSegmentsByCollection'
type MockTargetManager_GetSealedSegmentsByCollection_Call struct {
*mock.Call
}
// GetSealedSegmentsByCollection is a helper method to define mock.On call
// - collectionID int64
// - scope int32
func (_e *MockTargetManager_Expecter) GetSealedSegmentsByCollection(collectionID interface{}, scope interface{}) *MockTargetManager_GetSealedSegmentsByCollection_Call {
return &MockTargetManager_GetSealedSegmentsByCollection_Call{Call: _e.mock.On("GetSealedSegmentsByCollection", collectionID, scope)}
}
func (_c *MockTargetManager_GetSealedSegmentsByCollection_Call) Run(run func(collectionID int64, scope int32)) *MockTargetManager_GetSealedSegmentsByCollection_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64), args[1].(int32))
})
return _c
}
func (_c *MockTargetManager_GetSealedSegmentsByCollection_Call) Return(_a0 map[int64]*datapb.SegmentInfo) *MockTargetManager_GetSealedSegmentsByCollection_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockTargetManager_GetSealedSegmentsByCollection_Call) RunAndReturn(run func(int64, int32) map[int64]*datapb.SegmentInfo) *MockTargetManager_GetSealedSegmentsByCollection_Call {
_c.Call.Return(run)
return _c
}
// GetSealedSegmentsByPartition provides a mock function with given fields: collectionID, partitionID, scope
func (_m *MockTargetManager) GetSealedSegmentsByPartition(collectionID int64, partitionID int64, scope int32) map[int64]*datapb.SegmentInfo {
ret := _m.Called(collectionID, partitionID, scope)
var r0 map[int64]*datapb.SegmentInfo
if rf, ok := ret.Get(0).(func(int64, int64, int32) map[int64]*datapb.SegmentInfo); ok {
r0 = rf(collectionID, partitionID, scope)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(map[int64]*datapb.SegmentInfo)
}
}
return r0
}
// MockTargetManager_GetSealedSegmentsByPartition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSealedSegmentsByPartition'
type MockTargetManager_GetSealedSegmentsByPartition_Call struct {
*mock.Call
}
// GetSealedSegmentsByPartition is a helper method to define mock.On call
// - collectionID int64
// - partitionID int64
// - scope int32
func (_e *MockTargetManager_Expecter) GetSealedSegmentsByPartition(collectionID interface{}, partitionID interface{}, scope interface{}) *MockTargetManager_GetSealedSegmentsByPartition_Call {
return &MockTargetManager_GetSealedSegmentsByPartition_Call{Call: _e.mock.On("GetSealedSegmentsByPartition", collectionID, partitionID, scope)}
}
func (_c *MockTargetManager_GetSealedSegmentsByPartition_Call) Run(run func(collectionID int64, partitionID int64, scope int32)) *MockTargetManager_GetSealedSegmentsByPartition_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64), args[1].(int64), args[2].(int32))
})
return _c
}
func (_c *MockTargetManager_GetSealedSegmentsByPartition_Call) Return(_a0 map[int64]*datapb.SegmentInfo) *MockTargetManager_GetSealedSegmentsByPartition_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockTargetManager_GetSealedSegmentsByPartition_Call) RunAndReturn(run func(int64, int64, int32) map[int64]*datapb.SegmentInfo) *MockTargetManager_GetSealedSegmentsByPartition_Call {
_c.Call.Return(run)
return _c
}
// IsCurrentTargetExist provides a mock function with given fields: collectionID
func (_m *MockTargetManager) IsCurrentTargetExist(collectionID int64) bool {
ret := _m.Called(collectionID)
var r0 bool
if rf, ok := ret.Get(0).(func(int64) bool); ok {
r0 = rf(collectionID)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// MockTargetManager_IsCurrentTargetExist_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsCurrentTargetExist'
type MockTargetManager_IsCurrentTargetExist_Call struct {
*mock.Call
}
// IsCurrentTargetExist is a helper method to define mock.On call
// - collectionID int64
func (_e *MockTargetManager_Expecter) IsCurrentTargetExist(collectionID interface{}) *MockTargetManager_IsCurrentTargetExist_Call {
return &MockTargetManager_IsCurrentTargetExist_Call{Call: _e.mock.On("IsCurrentTargetExist", collectionID)}
}
func (_c *MockTargetManager_IsCurrentTargetExist_Call) Run(run func(collectionID int64)) *MockTargetManager_IsCurrentTargetExist_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64))
})
return _c
}
func (_c *MockTargetManager_IsCurrentTargetExist_Call) Return(_a0 bool) *MockTargetManager_IsCurrentTargetExist_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockTargetManager_IsCurrentTargetExist_Call) RunAndReturn(run func(int64) bool) *MockTargetManager_IsCurrentTargetExist_Call {
_c.Call.Return(run)
return _c
}
// IsNextTargetExist provides a mock function with given fields: collectionID
func (_m *MockTargetManager) IsNextTargetExist(collectionID int64) bool {
ret := _m.Called(collectionID)
var r0 bool
if rf, ok := ret.Get(0).(func(int64) bool); ok {
r0 = rf(collectionID)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// MockTargetManager_IsNextTargetExist_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsNextTargetExist'
type MockTargetManager_IsNextTargetExist_Call struct {
*mock.Call
}
// IsNextTargetExist is a helper method to define mock.On call
// - collectionID int64
func (_e *MockTargetManager_Expecter) IsNextTargetExist(collectionID interface{}) *MockTargetManager_IsNextTargetExist_Call {
return &MockTargetManager_IsNextTargetExist_Call{Call: _e.mock.On("IsNextTargetExist", collectionID)}
}
func (_c *MockTargetManager_IsNextTargetExist_Call) Run(run func(collectionID int64)) *MockTargetManager_IsNextTargetExist_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64))
})
return _c
}
func (_c *MockTargetManager_IsNextTargetExist_Call) Return(_a0 bool) *MockTargetManager_IsNextTargetExist_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockTargetManager_IsNextTargetExist_Call) RunAndReturn(run func(int64) bool) *MockTargetManager_IsNextTargetExist_Call {
_c.Call.Return(run)
return _c
}
// PullNextTargetV1 provides a mock function with given fields: broker, collectionID, chosenPartitionIDs
func (_m *MockTargetManager) PullNextTargetV1(broker Broker, collectionID int64, chosenPartitionIDs ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error) {
_va := make([]interface{}, len(chosenPartitionIDs))
for _i := range chosenPartitionIDs {
_va[_i] = chosenPartitionIDs[_i]
}
var _ca []interface{}
_ca = append(_ca, broker, collectionID)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 map[int64]*datapb.SegmentInfo
var r1 map[string]*DmChannel
var r2 error
if rf, ok := ret.Get(0).(func(Broker, int64, ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error)); ok {
return rf(broker, collectionID, chosenPartitionIDs...)
}
if rf, ok := ret.Get(0).(func(Broker, int64, ...int64) map[int64]*datapb.SegmentInfo); ok {
r0 = rf(broker, collectionID, chosenPartitionIDs...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(map[int64]*datapb.SegmentInfo)
}
}
if rf, ok := ret.Get(1).(func(Broker, int64, ...int64) map[string]*DmChannel); ok {
r1 = rf(broker, collectionID, chosenPartitionIDs...)
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).(map[string]*DmChannel)
}
}
if rf, ok := ret.Get(2).(func(Broker, int64, ...int64) error); ok {
r2 = rf(broker, collectionID, chosenPartitionIDs...)
} else {
r2 = ret.Error(2)
}
return r0, r1, r2
}
// MockTargetManager_PullNextTargetV1_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'PullNextTargetV1'
type MockTargetManager_PullNextTargetV1_Call struct {
*mock.Call
}
// PullNextTargetV1 is a helper method to define mock.On call
// - broker Broker
// - collectionID int64
// - chosenPartitionIDs ...int64
func (_e *MockTargetManager_Expecter) PullNextTargetV1(broker interface{}, collectionID interface{}, chosenPartitionIDs ...interface{}) *MockTargetManager_PullNextTargetV1_Call {
return &MockTargetManager_PullNextTargetV1_Call{Call: _e.mock.On("PullNextTargetV1",
append([]interface{}{broker, collectionID}, chosenPartitionIDs...)...)}
}
func (_c *MockTargetManager_PullNextTargetV1_Call) Run(run func(broker Broker, collectionID int64, chosenPartitionIDs ...int64)) *MockTargetManager_PullNextTargetV1_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]int64, len(args)-2)
for i, a := range args[2:] {
if a != nil {
variadicArgs[i] = a.(int64)
}
}
run(args[0].(Broker), args[1].(int64), variadicArgs...)
})
return _c
}
func (_c *MockTargetManager_PullNextTargetV1_Call) Return(_a0 map[int64]*datapb.SegmentInfo, _a1 map[string]*DmChannel, _a2 error) *MockTargetManager_PullNextTargetV1_Call {
_c.Call.Return(_a0, _a1, _a2)
return _c
}
func (_c *MockTargetManager_PullNextTargetV1_Call) RunAndReturn(run func(Broker, int64, ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error)) *MockTargetManager_PullNextTargetV1_Call {
_c.Call.Return(run)
return _c
}
// PullNextTargetV2 provides a mock function with given fields: broker, collectionID, chosenPartitionIDs
func (_m *MockTargetManager) PullNextTargetV2(broker Broker, collectionID int64, chosenPartitionIDs ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error) {
_va := make([]interface{}, len(chosenPartitionIDs))
for _i := range chosenPartitionIDs {
_va[_i] = chosenPartitionIDs[_i]
}
var _ca []interface{}
_ca = append(_ca, broker, collectionID)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 map[int64]*datapb.SegmentInfo
var r1 map[string]*DmChannel
var r2 error
if rf, ok := ret.Get(0).(func(Broker, int64, ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error)); ok {
return rf(broker, collectionID, chosenPartitionIDs...)
}
if rf, ok := ret.Get(0).(func(Broker, int64, ...int64) map[int64]*datapb.SegmentInfo); ok {
r0 = rf(broker, collectionID, chosenPartitionIDs...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(map[int64]*datapb.SegmentInfo)
}
}
if rf, ok := ret.Get(1).(func(Broker, int64, ...int64) map[string]*DmChannel); ok {
r1 = rf(broker, collectionID, chosenPartitionIDs...)
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).(map[string]*DmChannel)
}
}
if rf, ok := ret.Get(2).(func(Broker, int64, ...int64) error); ok {
r2 = rf(broker, collectionID, chosenPartitionIDs...)
} else {
r2 = ret.Error(2)
}
return r0, r1, r2
}
// MockTargetManager_PullNextTargetV2_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'PullNextTargetV2'
type MockTargetManager_PullNextTargetV2_Call struct {
*mock.Call
}
// PullNextTargetV2 is a helper method to define mock.On call
// - broker Broker
// - collectionID int64
// - chosenPartitionIDs ...int64
func (_e *MockTargetManager_Expecter) PullNextTargetV2(broker interface{}, collectionID interface{}, chosenPartitionIDs ...interface{}) *MockTargetManager_PullNextTargetV2_Call {
return &MockTargetManager_PullNextTargetV2_Call{Call: _e.mock.On("PullNextTargetV2",
append([]interface{}{broker, collectionID}, chosenPartitionIDs...)...)}
}
func (_c *MockTargetManager_PullNextTargetV2_Call) Run(run func(broker Broker, collectionID int64, chosenPartitionIDs ...int64)) *MockTargetManager_PullNextTargetV2_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]int64, len(args)-2)
for i, a := range args[2:] {
if a != nil {
variadicArgs[i] = a.(int64)
}
}
run(args[0].(Broker), args[1].(int64), variadicArgs...)
})
return _c
}
func (_c *MockTargetManager_PullNextTargetV2_Call) Return(_a0 map[int64]*datapb.SegmentInfo, _a1 map[string]*DmChannel, _a2 error) *MockTargetManager_PullNextTargetV2_Call {
_c.Call.Return(_a0, _a1, _a2)
return _c
}
func (_c *MockTargetManager_PullNextTargetV2_Call) RunAndReturn(run func(Broker, int64, ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error)) *MockTargetManager_PullNextTargetV2_Call {
_c.Call.Return(run)
return _c
}
// Recover provides a mock function with given fields: catalog
func (_m *MockTargetManager) Recover(catalog metastore.QueryCoordCatalog) error {
ret := _m.Called(catalog)
var r0 error
if rf, ok := ret.Get(0).(func(metastore.QueryCoordCatalog) error); ok {
r0 = rf(catalog)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockTargetManager_Recover_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Recover'
type MockTargetManager_Recover_Call struct {
*mock.Call
}
// Recover is a helper method to define mock.On call
// - catalog metastore.QueryCoordCatalog
func (_e *MockTargetManager_Expecter) Recover(catalog interface{}) *MockTargetManager_Recover_Call {
return &MockTargetManager_Recover_Call{Call: _e.mock.On("Recover", catalog)}
}
func (_c *MockTargetManager_Recover_Call) Run(run func(catalog metastore.QueryCoordCatalog)) *MockTargetManager_Recover_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(metastore.QueryCoordCatalog))
})
return _c
}
func (_c *MockTargetManager_Recover_Call) Return(_a0 error) *MockTargetManager_Recover_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockTargetManager_Recover_Call) RunAndReturn(run func(metastore.QueryCoordCatalog) error) *MockTargetManager_Recover_Call {
_c.Call.Return(run)
return _c
}
// RemoveCollection provides a mock function with given fields: collectionID
func (_m *MockTargetManager) RemoveCollection(collectionID int64) {
_m.Called(collectionID)
}
// MockTargetManager_RemoveCollection_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemoveCollection'
type MockTargetManager_RemoveCollection_Call struct {
*mock.Call
}
// RemoveCollection is a helper method to define mock.On call
// - collectionID int64
func (_e *MockTargetManager_Expecter) RemoveCollection(collectionID interface{}) *MockTargetManager_RemoveCollection_Call {
return &MockTargetManager_RemoveCollection_Call{Call: _e.mock.On("RemoveCollection", collectionID)}
}
func (_c *MockTargetManager_RemoveCollection_Call) Run(run func(collectionID int64)) *MockTargetManager_RemoveCollection_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64))
})
return _c
}
func (_c *MockTargetManager_RemoveCollection_Call) Return() *MockTargetManager_RemoveCollection_Call {
_c.Call.Return()
return _c
}
func (_c *MockTargetManager_RemoveCollection_Call) RunAndReturn(run func(int64)) *MockTargetManager_RemoveCollection_Call {
_c.Call.Return(run)
return _c
}
// RemovePartition provides a mock function with given fields: collectionID, partitionIDs
func (_m *MockTargetManager) RemovePartition(collectionID int64, partitionIDs ...int64) {
_va := make([]interface{}, len(partitionIDs))
for _i := range partitionIDs {
_va[_i] = partitionIDs[_i]
}
var _ca []interface{}
_ca = append(_ca, collectionID)
_ca = append(_ca, _va...)
_m.Called(_ca...)
}
// MockTargetManager_RemovePartition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemovePartition'
type MockTargetManager_RemovePartition_Call struct {
*mock.Call
}
// RemovePartition is a helper method to define mock.On call
// - collectionID int64
// - partitionIDs ...int64
func (_e *MockTargetManager_Expecter) RemovePartition(collectionID interface{}, partitionIDs ...interface{}) *MockTargetManager_RemovePartition_Call {
return &MockTargetManager_RemovePartition_Call{Call: _e.mock.On("RemovePartition",
append([]interface{}{collectionID}, partitionIDs...)...)}
}
func (_c *MockTargetManager_RemovePartition_Call) Run(run func(collectionID int64, partitionIDs ...int64)) *MockTargetManager_RemovePartition_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]int64, len(args)-1)
for i, a := range args[1:] {
if a != nil {
variadicArgs[i] = a.(int64)
}
}
run(args[0].(int64), variadicArgs...)
})
return _c
}
func (_c *MockTargetManager_RemovePartition_Call) Return() *MockTargetManager_RemovePartition_Call {
_c.Call.Return()
return _c
}
func (_c *MockTargetManager_RemovePartition_Call) RunAndReturn(run func(int64, ...int64)) *MockTargetManager_RemovePartition_Call {
_c.Call.Return(run)
return _c
}
// SaveCurrentTarget provides a mock function with given fields: catalog
func (_m *MockTargetManager) SaveCurrentTarget(catalog metastore.QueryCoordCatalog) {
_m.Called(catalog)
}
// MockTargetManager_SaveCurrentTarget_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SaveCurrentTarget'
type MockTargetManager_SaveCurrentTarget_Call struct {
*mock.Call
}
// SaveCurrentTarget is a helper method to define mock.On call
// - catalog metastore.QueryCoordCatalog
func (_e *MockTargetManager_Expecter) SaveCurrentTarget(catalog interface{}) *MockTargetManager_SaveCurrentTarget_Call {
return &MockTargetManager_SaveCurrentTarget_Call{Call: _e.mock.On("SaveCurrentTarget", catalog)}
}
func (_c *MockTargetManager_SaveCurrentTarget_Call) Run(run func(catalog metastore.QueryCoordCatalog)) *MockTargetManager_SaveCurrentTarget_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(metastore.QueryCoordCatalog))
})
return _c
}
func (_c *MockTargetManager_SaveCurrentTarget_Call) Return() *MockTargetManager_SaveCurrentTarget_Call {
_c.Call.Return()
return _c
}
func (_c *MockTargetManager_SaveCurrentTarget_Call) RunAndReturn(run func(metastore.QueryCoordCatalog)) *MockTargetManager_SaveCurrentTarget_Call {
_c.Call.Return(run)
return _c
}
// UpdateCollectionCurrentTarget provides a mock function with given fields: collectionID
func (_m *MockTargetManager) UpdateCollectionCurrentTarget(collectionID int64) bool {
ret := _m.Called(collectionID)
var r0 bool
if rf, ok := ret.Get(0).(func(int64) bool); ok {
r0 = rf(collectionID)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// MockTargetManager_UpdateCollectionCurrentTarget_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateCollectionCurrentTarget'
type MockTargetManager_UpdateCollectionCurrentTarget_Call struct {
*mock.Call
}
// UpdateCollectionCurrentTarget is a helper method to define mock.On call
// - collectionID int64
func (_e *MockTargetManager_Expecter) UpdateCollectionCurrentTarget(collectionID interface{}) *MockTargetManager_UpdateCollectionCurrentTarget_Call {
return &MockTargetManager_UpdateCollectionCurrentTarget_Call{Call: _e.mock.On("UpdateCollectionCurrentTarget", collectionID)}
}
func (_c *MockTargetManager_UpdateCollectionCurrentTarget_Call) Run(run func(collectionID int64)) *MockTargetManager_UpdateCollectionCurrentTarget_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64))
})
return _c
}
func (_c *MockTargetManager_UpdateCollectionCurrentTarget_Call) Return(_a0 bool) *MockTargetManager_UpdateCollectionCurrentTarget_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockTargetManager_UpdateCollectionCurrentTarget_Call) RunAndReturn(run func(int64) bool) *MockTargetManager_UpdateCollectionCurrentTarget_Call {
_c.Call.Return(run)
return _c
}
// UpdateCollectionNextTarget provides a mock function with given fields: collectionID
func (_m *MockTargetManager) UpdateCollectionNextTarget(collectionID int64) error {
ret := _m.Called(collectionID)
var r0 error
if rf, ok := ret.Get(0).(func(int64) error); ok {
r0 = rf(collectionID)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockTargetManager_UpdateCollectionNextTarget_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateCollectionNextTarget'
type MockTargetManager_UpdateCollectionNextTarget_Call struct {
*mock.Call
}
// UpdateCollectionNextTarget is a helper method to define mock.On call
// - collectionID int64
func (_e *MockTargetManager_Expecter) UpdateCollectionNextTarget(collectionID interface{}) *MockTargetManager_UpdateCollectionNextTarget_Call {
return &MockTargetManager_UpdateCollectionNextTarget_Call{Call: _e.mock.On("UpdateCollectionNextTarget", collectionID)}
}
func (_c *MockTargetManager_UpdateCollectionNextTarget_Call) Run(run func(collectionID int64)) *MockTargetManager_UpdateCollectionNextTarget_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64))
})
return _c
}
func (_c *MockTargetManager_UpdateCollectionNextTarget_Call) Return(_a0 error) *MockTargetManager_UpdateCollectionNextTarget_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockTargetManager_UpdateCollectionNextTarget_Call) RunAndReturn(run func(int64) error) *MockTargetManager_UpdateCollectionNextTarget_Call {
_c.Call.Return(run)
return _c
}
// NewMockTargetManager creates a new instance of MockTargetManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockTargetManager(t interface {
mock.TestingT
Cleanup(func())
}) *MockTargetManager {
mock := &MockTargetManager{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -50,6 +50,29 @@ const (
NextTargetFirst
)
type TargetManagerInterface interface {
UpdateCollectionCurrentTarget(collectionID int64) bool
UpdateCollectionNextTarget(collectionID int64) error
PullNextTargetV1(broker Broker, collectionID int64, chosenPartitionIDs ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error)
PullNextTargetV2(broker Broker, collectionID int64, chosenPartitionIDs ...int64) (map[int64]*datapb.SegmentInfo, map[string]*DmChannel, error)
RemoveCollection(collectionID int64)
RemovePartition(collectionID int64, partitionIDs ...int64)
GetGrowingSegmentsByCollection(collectionID int64, scope TargetScope) typeutil.UniqueSet
GetGrowingSegmentsByChannel(collectionID int64, channelName string, scope TargetScope) typeutil.UniqueSet
GetSealedSegmentsByCollection(collectionID int64, scope TargetScope) map[int64]*datapb.SegmentInfo
GetSealedSegmentsByChannel(collectionID int64, channelName string, scope TargetScope) map[int64]*datapb.SegmentInfo
GetDroppedSegmentsByChannel(collectionID int64, channelName string, scope TargetScope) []int64
GetSealedSegmentsByPartition(collectionID int64, partitionID int64, scope TargetScope) map[int64]*datapb.SegmentInfo
GetDmChannelsByCollection(collectionID int64, scope TargetScope) map[string]*DmChannel
GetDmChannel(collectionID int64, channel string, scope TargetScope) *DmChannel
GetSealedSegment(collectionID int64, id int64, scope TargetScope) *datapb.SegmentInfo
GetCollectionTargetVersion(collectionID int64, scope TargetScope) int64
IsCurrentTargetExist(collectionID int64) bool
IsNextTargetExist(collectionID int64) bool
SaveCurrentTarget(catalog metastore.QueryCoordCatalog)
Recover(catalog metastore.QueryCoordCatalog) error
}
type TargetManager struct {
rwMutex sync.RWMutex
broker Broker

View File

@ -67,6 +67,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/gc"
"github.com/milvus-io/milvus/pkg/util/hardware"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -130,6 +131,10 @@ type QueryNode struct {
// parameter turning hook
queryHook optimizers.QueryHook
// record the last modify ts of segment/channel distribution
lastModifyLock lock.RWMutex
lastModifyTs int64
}
// NewQueryNode will return a QueryNode with abnormal state.

View File

@ -21,6 +21,7 @@ import (
"fmt"
"strconv"
"sync"
"time"
"github.com/golang/protobuf/proto"
"github.com/samber/lo"
@ -190,6 +191,8 @@ func (node *QueryNode) composeIndexMeta(indexInfos []*indexpb.IndexInfo, schema
// WatchDmChannels create consumers on dmChannels to receive Incremental datawhich is the important part of real-time query
func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDmChannelsRequest) (status *commonpb.Status, e error) {
defer node.updateDistributionModifyTS()
channel := req.GetInfos()[0]
log := log.Ctx(ctx).With(
zap.Int64("collectionID", req.GetCollectionID()),
@ -339,6 +342,7 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm
}
func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmChannelRequest) (*commonpb.Status, error) {
defer node.updateDistributionModifyTS()
log := log.Ctx(ctx).With(
zap.Int64("collectionID", req.GetCollectionID()),
zap.String("channel", req.GetChannelName()),
@ -396,6 +400,7 @@ func (node *QueryNode) LoadPartitions(ctx context.Context, req *querypb.LoadPart
// LoadSegments load historical data into query node, historical data can be vector data or index
func (node *QueryNode) LoadSegments(ctx context.Context, req *querypb.LoadSegmentsRequest) (*commonpb.Status, error) {
defer node.updateDistributionModifyTS()
segment := req.GetInfos()[0]
log := log.Ctx(ctx).With(
@ -527,6 +532,7 @@ func (node *QueryNode) ReleasePartitions(ctx context.Context, req *querypb.Relea
// ReleaseSegments remove the specified segments from query node according segmentIDs, partitionIDs, and collectionID
func (node *QueryNode) ReleaseSegments(ctx context.Context, req *querypb.ReleaseSegmentsRequest) (*commonpb.Status, error) {
defer node.updateDistributionModifyTS()
log := log.Ctx(ctx).With(
zap.Int64("collectionID", req.GetCollectionID()),
zap.String("shard", req.GetShard()),
@ -1174,6 +1180,23 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get
}
defer node.lifetime.Done()
lastModifyTs := node.getDistributionModifyTS()
distributionChange := func() bool {
if req.GetLastUpdateTs() == 0 {
return true
}
return req.GetLastUpdateTs() < lastModifyTs
}
if !distributionChange() {
return &querypb.GetDataDistributionResponse{
Status: merr.Success(),
NodeID: node.GetNodeID(),
LastModifyTs: lastModifyTs,
}, nil
}
sealedSegments := node.manager.Segment.GetBy(segments.WithType(commonpb.SegmentState_Sealed))
segmentVersionInfos := make([]*querypb.SegmentVersionInfo, 0, len(sealedSegments))
for _, s := range sealedSegments {
@ -1239,15 +1262,18 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get
})
return &querypb.GetDataDistributionResponse{
Status: merr.Success(),
NodeID: node.GetNodeID(),
Segments: segmentVersionInfos,
Channels: channelVersionInfos,
LeaderViews: leaderViews,
Status: merr.Success(),
NodeID: node.GetNodeID(),
Segments: segmentVersionInfos,
Channels: channelVersionInfos,
LeaderViews: leaderViews,
LastModifyTs: lastModifyTs,
}, nil
}
func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDistributionRequest) (*commonpb.Status, error) {
defer node.updateDistributionModifyTS()
log := log.Ctx(ctx).With(zap.Int64("collectionID", req.GetCollectionID()),
zap.String("channel", req.GetChannel()), zap.Int64("currentNodeID", node.GetNodeID()))
// check node healthy
@ -1402,3 +1428,16 @@ func (req *deleteRequestStringer) String() string {
tss := req.GetTimestamps()
return fmt.Sprintf("%s, timestamp range: [%d-%d]", pkInfo, tss[0], tss[len(tss)-1])
}
func (node *QueryNode) updateDistributionModifyTS() {
node.lastModifyLock.Lock()
defer node.lastModifyLock.Unlock()
node.lastModifyTs = time.Now().UnixNano()
}
func (node *QueryNode) getDistributionModifyTS() int64 {
node.lastModifyLock.RLock()
defer node.lastModifyLock.RUnlock()
return node.lastModifyTs
}