enhance: Make dynamic load/release partition follow targets (#38059)

Related to #37849

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/38252/head
congqixia 2024-12-05 16:24:40 +08:00 committed by GitHub
parent 32645fc28a
commit 051bc280dd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 399 additions and 539 deletions

View File

@ -50,7 +50,6 @@ type LoadCollectionJob struct {
dist *meta.DistributionManager
meta *meta.Meta
broker meta.Broker
cluster session.Cluster
targetMgr meta.TargetManagerInterface
targetObserver *observers.TargetObserver
collectionObserver *observers.CollectionObserver
@ -63,7 +62,6 @@ func NewLoadCollectionJob(
dist *meta.DistributionManager,
meta *meta.Meta,
broker meta.Broker,
cluster session.Cluster,
targetMgr meta.TargetManagerInterface,
targetObserver *observers.TargetObserver,
collectionObserver *observers.CollectionObserver,
@ -72,11 +70,10 @@ func NewLoadCollectionJob(
return &LoadCollectionJob{
BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()),
req: req,
undo: NewUndoList(ctx, meta, cluster, targetMgr, targetObserver),
undo: NewUndoList(ctx, meta, targetMgr, targetObserver),
dist: dist,
meta: meta,
broker: broker,
cluster: cluster,
targetMgr: targetMgr,
targetObserver: targetObserver,
collectionObserver: collectionObserver,
@ -193,12 +190,6 @@ func (job *LoadCollectionJob) Execute() error {
job.undo.IsReplicaCreated = true
}
// 3. loadPartitions on QueryNodes
err = loadPartitions(job.ctx, job.meta, job.cluster, job.broker, true, req.GetCollectionID(), lackPartitionIDs...)
if err != nil {
return err
}
// 4. put collection/partitions meta
partitions := lo.Map(lackPartitionIDs, func(partID int64, _ int) *meta.Partition {
return &meta.Partition{
@ -264,7 +255,6 @@ type LoadPartitionJob struct {
dist *meta.DistributionManager
meta *meta.Meta
broker meta.Broker
cluster session.Cluster
targetMgr meta.TargetManagerInterface
targetObserver *observers.TargetObserver
collectionObserver *observers.CollectionObserver
@ -277,7 +267,6 @@ func NewLoadPartitionJob(
dist *meta.DistributionManager,
meta *meta.Meta,
broker meta.Broker,
cluster session.Cluster,
targetMgr meta.TargetManagerInterface,
targetObserver *observers.TargetObserver,
collectionObserver *observers.CollectionObserver,
@ -286,11 +275,10 @@ func NewLoadPartitionJob(
return &LoadPartitionJob{
BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()),
req: req,
undo: NewUndoList(ctx, meta, cluster, targetMgr, targetObserver),
undo: NewUndoList(ctx, meta, targetMgr, targetObserver),
dist: dist,
meta: meta,
broker: broker,
cluster: cluster,
targetMgr: targetMgr,
targetObserver: targetObserver,
collectionObserver: collectionObserver,
@ -399,12 +387,6 @@ func (job *LoadPartitionJob) Execute() error {
job.undo.IsReplicaCreated = true
}
// 3. loadPartitions on QueryNodes
err = loadPartitions(job.ctx, job.meta, job.cluster, job.broker, true, req.GetCollectionID(), lackPartitionIDs...)
if err != nil {
return err
}
// 4. put collection/partitions meta
partitions := lo.Map(lackPartitionIDs, func(partID int64, _ int) *meta.Partition {
return &meta.Partition{

View File

@ -53,7 +53,6 @@ func NewReleaseCollectionJob(ctx context.Context,
dist *meta.DistributionManager,
meta *meta.Meta,
broker meta.Broker,
cluster session.Cluster,
targetMgr meta.TargetManagerInterface,
targetObserver *observers.TargetObserver,
checkerController *checkers.CheckerController,
@ -65,7 +64,6 @@ func NewReleaseCollectionJob(ctx context.Context,
dist: dist,
meta: meta,
broker: broker,
cluster: cluster,
targetMgr: targetMgr,
targetObserver: targetObserver,
checkerController: checkerController,
@ -86,7 +84,6 @@ func (job *ReleaseCollectionJob) Execute() error {
toRelease := lo.Map(loadedPartitions, func(partition *meta.Partition, _ int) int64 {
return partition.GetPartitionID()
})
releasePartitions(job.ctx, job.meta, job.cluster, req.GetCollectionID(), toRelease...)
err := job.meta.CollectionManager.RemoveCollection(job.ctx, req.GetCollectionID())
if err != nil {
@ -137,7 +134,6 @@ func NewReleasePartitionJob(ctx context.Context,
dist *meta.DistributionManager,
meta *meta.Meta,
broker meta.Broker,
cluster session.Cluster,
targetMgr meta.TargetManagerInterface,
targetObserver *observers.TargetObserver,
checkerController *checkers.CheckerController,
@ -149,7 +145,6 @@ func NewReleasePartitionJob(ctx context.Context,
dist: dist,
meta: meta,
broker: broker,
cluster: cluster,
targetMgr: targetMgr,
targetObserver: targetObserver,
checkerController: checkerController,
@ -178,7 +173,6 @@ func (job *ReleasePartitionJob) Execute() error {
log.Warn("releasing partition(s) not loaded")
return nil
}
releasePartitions(job.ctx, job.meta, job.cluster, req.GetCollectionID(), toRelease...)
// If all partitions are released, clear all
if len(toRelease) == len(loadedPartitions) {
@ -211,6 +205,8 @@ func (job *ReleasePartitionJob) Execute() error {
return errors.Wrap(err, msg)
}
job.targetObserver.ReleasePartition(req.GetCollectionID(), toRelease...)
// wait current target updated, so following querys will act as expected
waitCurrentTargetUpdated(job.ctx, job.targetObserver, job.req.GetCollectionID())
waitCollectionReleased(job.dist, job.checkerController, req.GetCollectionID(), toRelease...)
}
return nil

View File

@ -25,31 +25,36 @@ import (
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/observers"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/pkg/log"
)
type SyncNewCreatedPartitionJob struct {
*BaseJob
req *querypb.SyncNewCreatedPartitionRequest
meta *meta.Meta
cluster session.Cluster
broker meta.Broker
req *querypb.SyncNewCreatedPartitionRequest
meta *meta.Meta
cluster session.Cluster
broker meta.Broker
targetObserver *observers.TargetObserver
targetMgr meta.TargetManagerInterface
}
func NewSyncNewCreatedPartitionJob(
ctx context.Context,
req *querypb.SyncNewCreatedPartitionRequest,
meta *meta.Meta,
cluster session.Cluster,
broker meta.Broker,
targetObserver *observers.TargetObserver,
targetMgr meta.TargetManagerInterface,
) *SyncNewCreatedPartitionJob {
return &SyncNewCreatedPartitionJob{
BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()),
req: req,
meta: meta,
cluster: cluster,
broker: broker,
BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()),
req: req,
meta: meta,
broker: broker,
targetObserver: targetObserver,
targetMgr: targetMgr,
}
}
@ -75,11 +80,6 @@ func (job *SyncNewCreatedPartitionJob) Execute() error {
return nil
}
err := loadPartitions(job.ctx, job.meta, job.cluster, job.broker, false, req.GetCollectionID(), req.GetPartitionID())
if err != nil {
return err
}
partition := &meta.Partition{
PartitionLoadInfo: &querypb.PartitionLoadInfo{
CollectionID: req.GetCollectionID(),
@ -89,12 +89,12 @@ func (job *SyncNewCreatedPartitionJob) Execute() error {
LoadPercentage: 100,
CreatedAt: time.Now(),
}
err = job.meta.CollectionManager.PutPartition(job.ctx, partition)
err := job.meta.CollectionManager.PutPartition(job.ctx, partition)
if err != nil {
msg := "failed to store partitions"
log.Warn(msg, zap.Error(err))
return errors.Wrap(err, msg)
}
return nil
return waitCurrentTargetUpdated(job.ctx, job.targetObserver, job.req.GetCollectionID())
}

View File

@ -18,8 +18,8 @@ package job
import (
"context"
"fmt"
"testing"
"time"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
@ -39,11 +39,13 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/observers"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/internal/util/proxyutil"
"github.com/milvus-io/milvus/pkg/kv"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
const (
@ -136,15 +138,10 @@ func (suite *JobSuite) SetupSuite() {
suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything).
Return(nil, nil)
suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).
Return(nil, nil)
Return(nil, nil).Maybe()
suite.cluster = session.NewMockCluster(suite.T())
suite.cluster.EXPECT().
LoadPartitions(mock.Anything, mock.Anything, mock.Anything).
Return(merr.Success(), nil)
suite.cluster.EXPECT().
ReleasePartitions(mock.Anything, mock.Anything, mock.Anything).
Return(merr.Success(), nil).Maybe()
suite.cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe()
suite.proxyManager = proxyutil.NewMockProxyClientManager(suite.T())
suite.proxyManager.EXPECT().InvalidateCollectionMetaCache(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
@ -247,7 +244,6 @@ func (suite *JobSuite) TestLoadCollection() {
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
@ -275,7 +271,6 @@ func (suite *JobSuite) TestLoadCollection() {
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
@ -301,7 +296,6 @@ func (suite *JobSuite) TestLoadCollection() {
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
@ -329,7 +323,6 @@ func (suite *JobSuite) TestLoadCollection() {
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
@ -365,7 +358,6 @@ func (suite *JobSuite) TestLoadCollection() {
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
@ -387,7 +379,6 @@ func (suite *JobSuite) TestLoadCollection() {
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
@ -417,7 +408,6 @@ func (suite *JobSuite) TestLoadCollectionWithReplicas() {
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
@ -449,7 +439,6 @@ func (suite *JobSuite) TestLoadCollectionWithLoadFields() {
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
@ -479,7 +468,6 @@ func (suite *JobSuite) TestLoadCollectionWithLoadFields() {
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
@ -507,7 +495,6 @@ func (suite *JobSuite) TestLoadCollectionWithLoadFields() {
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
@ -541,7 +528,6 @@ func (suite *JobSuite) TestLoadCollectionWithLoadFields() {
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
@ -574,7 +560,6 @@ func (suite *JobSuite) TestLoadPartition() {
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
@ -605,7 +590,6 @@ func (suite *JobSuite) TestLoadPartition() {
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
@ -633,7 +617,6 @@ func (suite *JobSuite) TestLoadPartition() {
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
@ -661,7 +644,6 @@ func (suite *JobSuite) TestLoadPartition() {
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
@ -688,7 +670,6 @@ func (suite *JobSuite) TestLoadPartition() {
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
@ -724,7 +705,6 @@ func (suite *JobSuite) TestLoadPartition() {
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
@ -747,7 +727,6 @@ func (suite *JobSuite) TestLoadPartition() {
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
@ -780,7 +759,6 @@ func (suite *JobSuite) TestLoadPartitionWithLoadFields() {
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
@ -813,7 +791,6 @@ func (suite *JobSuite) TestLoadPartitionWithLoadFields() {
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
@ -843,7 +820,6 @@ func (suite *JobSuite) TestLoadPartitionWithLoadFields() {
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
@ -879,7 +855,6 @@ func (suite *JobSuite) TestLoadPartitionWithLoadFields() {
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
@ -909,7 +884,6 @@ func (suite *JobSuite) TestDynamicLoad() {
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
@ -928,7 +902,6 @@ func (suite *JobSuite) TestDynamicLoad() {
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
@ -1028,7 +1001,6 @@ func (suite *JobSuite) TestLoadPartitionWithReplicas() {
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
@ -1056,7 +1028,6 @@ func (suite *JobSuite) TestReleaseCollection() {
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
@ -1080,7 +1051,6 @@ func (suite *JobSuite) TestReleaseCollection() {
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
@ -1110,7 +1080,6 @@ func (suite *JobSuite) TestReleasePartition() {
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
@ -1134,7 +1103,6 @@ func (suite *JobSuite) TestReleasePartition() {
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
@ -1149,6 +1117,12 @@ func (suite *JobSuite) TestReleasePartition() {
// Test release partial partitions
suite.releaseAll()
suite.loadAll()
for _, collectionID := range suite.collections {
// make collection able to get into loaded state
suite.updateChannelDist(ctx, collectionID, true)
suite.updateSegmentDist(collectionID, 3000, suite.partitions[collectionID]...)
waitCurrentTargetUpdated(ctx, suite.targetObserver, collectionID)
}
for _, collection := range suite.collections {
req := &querypb.ReleasePartitionsRequest{
CollectionID: collection,
@ -1160,13 +1134,14 @@ func (suite *JobSuite) TestReleasePartition() {
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
suite.proxyManager,
)
suite.scheduler.Add(job)
suite.updateChannelDist(ctx, collection, true)
suite.updateSegmentDist(collection, 3000, suite.partitions[collection][:1]...)
err := job.Wait()
suite.NoError(err)
suite.True(suite.meta.Exist(ctx, collection))
@ -1194,7 +1169,6 @@ func (suite *JobSuite) TestDynamicRelease() {
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
@ -1212,7 +1186,6 @@ func (suite *JobSuite) TestDynamicRelease() {
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
@ -1225,8 +1198,18 @@ func (suite *JobSuite) TestDynamicRelease() {
// action: release p0
// expect: p0 released, p1, p2 loaded
suite.loadAll()
for _, collectionID := range suite.collections {
// make collection able to get into loaded state
suite.updateChannelDist(ctx, collectionID, true)
suite.updateSegmentDist(collectionID, 3000, suite.partitions[collectionID]...)
waitCurrentTargetUpdated(ctx, suite.targetObserver, collectionID)
}
job := newReleasePartJob(col0, p0)
suite.scheduler.Add(job)
// update segments
suite.updateSegmentDist(col0, 3000, p1, p2)
suite.updateChannelDist(ctx, col0, true)
err := job.Wait()
suite.NoError(err)
suite.assertPartitionReleased(col0, p0)
@ -1237,6 +1220,8 @@ func (suite *JobSuite) TestDynamicRelease() {
// expect: p1 released, p2 loaded
job = newReleasePartJob(col0, p0, p1)
suite.scheduler.Add(job)
suite.updateSegmentDist(col0, 3000, p2)
suite.updateChannelDist(ctx, col0, true)
err = job.Wait()
suite.NoError(err)
suite.assertPartitionReleased(col0, p0, p1)
@ -1247,6 +1232,8 @@ func (suite *JobSuite) TestDynamicRelease() {
// expect: loadType=col: col loaded, p2 released
job = newReleasePartJob(col0, p2)
suite.scheduler.Add(job)
suite.updateSegmentDist(col0, 3000)
suite.updateChannelDist(ctx, col0, false)
err = job.Wait()
suite.NoError(err)
suite.assertPartitionReleased(col0, p0, p1, p2)
@ -1307,7 +1294,6 @@ func (suite *JobSuite) TestLoadCollectionStoreFailed() {
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
@ -1350,7 +1336,6 @@ func (suite *JobSuite) TestLoadPartitionStoreFailed() {
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
@ -1378,7 +1363,6 @@ func (suite *JobSuite) TestLoadCreateReplicaFailed() {
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
@ -1390,183 +1374,28 @@ func (suite *JobSuite) TestLoadCreateReplicaFailed() {
}
}
func (suite *JobSuite) TestCallLoadPartitionFailed() {
// call LoadPartitions failed at get index info
getIndexErr := fmt.Errorf("mock get index error")
suite.broker.ExpectedCalls = lo.Filter(suite.broker.ExpectedCalls, func(call *mock.Call, _ int) bool {
return call.Method != "ListIndexes"
})
for _, collection := range suite.collections {
suite.broker.EXPECT().ListIndexes(mock.Anything, collection).Return(nil, getIndexErr)
loadCollectionReq := &querypb.LoadCollectionRequest{
CollectionID: collection,
}
loadCollectionJob := NewLoadCollectionJob(
context.Background(),
loadCollectionReq,
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(loadCollectionJob)
err := loadCollectionJob.Wait()
suite.T().Logf("%s", err)
suite.ErrorIs(err, getIndexErr)
loadPartitionReq := &querypb.LoadPartitionsRequest{
CollectionID: collection,
PartitionIDs: suite.partitions[collection],
}
loadPartitionJob := NewLoadPartitionJob(
context.Background(),
loadPartitionReq,
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(loadPartitionJob)
err = loadPartitionJob.Wait()
suite.ErrorIs(err, getIndexErr)
}
// call LoadPartitions failed at get schema
getSchemaErr := fmt.Errorf("mock get schema error")
suite.broker.ExpectedCalls = lo.Filter(suite.broker.ExpectedCalls, func(call *mock.Call, _ int) bool {
return call.Method != "DescribeCollection"
})
for _, collection := range suite.collections {
suite.broker.EXPECT().DescribeCollection(mock.Anything, collection).Return(nil, getSchemaErr)
loadCollectionReq := &querypb.LoadCollectionRequest{
CollectionID: collection,
}
loadCollectionJob := NewLoadCollectionJob(
context.Background(),
loadCollectionReq,
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(loadCollectionJob)
err := loadCollectionJob.Wait()
suite.ErrorIs(err, getSchemaErr)
loadPartitionReq := &querypb.LoadPartitionsRequest{
CollectionID: collection,
PartitionIDs: suite.partitions[collection],
}
loadPartitionJob := NewLoadPartitionJob(
context.Background(),
loadPartitionReq,
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
suite.nodeMgr,
)
suite.scheduler.Add(loadPartitionJob)
err = loadPartitionJob.Wait()
suite.ErrorIs(err, getSchemaErr)
}
suite.broker.ExpectedCalls = lo.Filter(suite.broker.ExpectedCalls, func(call *mock.Call, _ int) bool {
return call.Method != "ListIndexes" && call.Method != "DescribeCollection"
})
suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(nil, nil)
suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, nil)
}
func (suite *JobSuite) TestCallReleasePartitionFailed() {
ctx := context.Background()
suite.loadAll()
releasePartitionErr := fmt.Errorf("mock release partitions error")
suite.cluster.ExpectedCalls = lo.Filter(suite.cluster.ExpectedCalls, func(call *mock.Call, _ int) bool {
return call.Method != "ReleasePartitions"
})
suite.cluster.EXPECT().ReleasePartitions(mock.Anything, mock.Anything, mock.Anything).
Return(nil, releasePartitionErr)
for _, collection := range suite.collections {
releaseCollectionReq := &querypb.ReleaseCollectionRequest{
CollectionID: collection,
}
releaseCollectionJob := NewReleaseCollectionJob(
ctx,
releaseCollectionReq,
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
suite.proxyManager,
)
suite.scheduler.Add(releaseCollectionJob)
err := releaseCollectionJob.Wait()
suite.NoError(err)
releasePartitionReq := &querypb.ReleasePartitionsRequest{
CollectionID: collection,
PartitionIDs: suite.partitions[collection],
}
releasePartitionJob := NewReleasePartitionJob(
ctx,
releasePartitionReq,
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
suite.proxyManager,
)
suite.scheduler.Add(releasePartitionJob)
err = releasePartitionJob.Wait()
suite.NoError(err)
}
suite.cluster.ExpectedCalls = lo.Filter(suite.cluster.ExpectedCalls, func(call *mock.Call, _ int) bool {
return call.Method != "ReleasePartitions"
})
suite.cluster.EXPECT().ReleasePartitions(mock.Anything, mock.Anything, mock.Anything).
Return(merr.Success(), nil)
}
func (suite *JobSuite) TestSyncNewCreatedPartition() {
newPartition := int64(999)
ctx := context.Background()
// test sync new created partition
suite.loadAll()
collectionID := suite.collections[0]
// make collection able to get into loaded state
suite.updateChannelDist(ctx, collectionID, true)
suite.updateSegmentDist(collectionID, 3000, suite.partitions[collectionID]...)
req := &querypb.SyncNewCreatedPartitionRequest{
CollectionID: suite.collections[0],
CollectionID: collectionID,
PartitionID: newPartition,
}
job := NewSyncNewCreatedPartitionJob(
context.Background(),
ctx,
req,
suite.meta,
suite.cluster,
suite.broker,
suite.targetObserver,
suite.targetMgr,
)
suite.scheduler.Add(job)
err := job.Wait()
@ -1581,11 +1410,12 @@ func (suite *JobSuite) TestSyncNewCreatedPartition() {
PartitionID: newPartition,
}
job = NewSyncNewCreatedPartitionJob(
context.Background(),
ctx,
req,
suite.meta,
suite.cluster,
suite.broker,
suite.targetObserver,
suite.targetMgr,
)
suite.scheduler.Add(job)
err = job.Wait()
@ -1597,11 +1427,12 @@ func (suite *JobSuite) TestSyncNewCreatedPartition() {
PartitionID: newPartition,
}
job = NewSyncNewCreatedPartitionJob(
context.Background(),
ctx,
req,
suite.meta,
suite.cluster,
suite.broker,
suite.targetObserver,
suite.targetMgr,
)
suite.scheduler.Add(job)
err = job.Wait()
@ -1621,7 +1452,6 @@ func (suite *JobSuite) loadAll() {
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
@ -1646,7 +1476,6 @@ func (suite *JobSuite) loadAll() {
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
@ -1676,7 +1505,6 @@ func (suite *JobSuite) releaseAll() {
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.checkerController,
@ -1746,6 +1574,59 @@ func (suite *JobSuite) assertPartitionReleased(collection int64, partitionIDs ..
}
}
func (suite *JobSuite) updateSegmentDist(collection, node int64, partitions ...int64) {
partitionSet := typeutil.NewSet(partitions...)
metaSegments := make([]*meta.Segment, 0)
for partition, segments := range suite.segments[collection] {
if !partitionSet.Contain(partition) {
continue
}
for _, segment := range segments {
metaSegments = append(metaSegments,
utils.CreateTestSegment(collection, partition, segment, node, 1, "test-channel"))
}
}
suite.dist.SegmentDistManager.Update(node, metaSegments...)
}
func (suite *JobSuite) updateChannelDist(ctx context.Context, collection int64, loaded bool) {
channels := suite.channels[collection]
segments := lo.Flatten(lo.Values(suite.segments[collection]))
replicas := suite.meta.ReplicaManager.GetByCollection(ctx, collection)
for _, replica := range replicas {
if loaded {
i := 0
for _, node := range replica.GetNodes() {
suite.dist.ChannelDistManager.Update(node, meta.DmChannelFromVChannel(&datapb.VchannelInfo{
CollectionID: collection,
ChannelName: channels[i],
}))
suite.dist.LeaderViewManager.Update(node, &meta.LeaderView{
ID: node,
CollectionID: collection,
Channel: channels[i],
Segments: lo.SliceToMap(segments, func(segment int64) (int64, *querypb.SegmentDist) {
return segment, &querypb.SegmentDist{
NodeID: node,
Version: time.Now().Unix(),
}
}),
})
i++
if i >= len(channels) {
break
}
}
} else {
for _, node := range replica.GetNodes() {
suite.dist.ChannelDistManager.Update(node)
suite.dist.LeaderViewManager.Update(node)
}
}
}
}
func TestJob(t *testing.T) {
suite.Run(t, new(JobSuite))
}

View File

@ -23,7 +23,6 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/observers"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/pkg/log"
)
@ -37,18 +36,16 @@ type UndoList struct {
ctx context.Context
meta *meta.Meta
cluster session.Cluster
targetMgr meta.TargetManagerInterface
targetObserver *observers.TargetObserver
}
func NewUndoList(ctx context.Context, meta *meta.Meta,
cluster session.Cluster, targetMgr meta.TargetManagerInterface, targetObserver *observers.TargetObserver,
targetMgr meta.TargetManagerInterface, targetObserver *observers.TargetObserver,
) *UndoList {
return &UndoList{
ctx: ctx,
meta: meta,
cluster: cluster,
targetMgr: targetMgr,
targetObserver: targetObserver,
}

View File

@ -23,14 +23,10 @@ import (
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/checkers"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/observers"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -68,82 +64,21 @@ func waitCollectionReleased(dist *meta.DistributionManager, checkerController *c
}
}
func loadPartitions(ctx context.Context,
meta *meta.Meta,
cluster session.Cluster,
broker meta.Broker,
withSchema bool,
collection int64,
partitions ...int64,
) error {
var err error
var schema *schemapb.CollectionSchema
if withSchema {
collectionInfo, err := broker.DescribeCollection(ctx, collection)
if err != nil {
return err
}
schema = collectionInfo.GetSchema()
}
indexes, err := broker.ListIndexes(ctx, collection)
func waitCurrentTargetUpdated(ctx context.Context, targetObserver *observers.TargetObserver, collection int64) error {
// manual trigger update next target
ready, err := targetObserver.UpdateNextTarget(collection)
if err != nil {
log.Warn("failed to update next target for sync partition job", zap.Error(err))
return err
}
replicas := meta.ReplicaManager.GetByCollection(ctx, collection)
loadReq := &querypb.LoadPartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_LoadPartitions,
},
CollectionID: collection,
PartitionIDs: partitions,
Schema: schema,
IndexInfoList: indexes,
}
for _, replica := range replicas {
for _, node := range replica.GetNodes() {
status, err := cluster.LoadPartitions(ctx, node, loadReq)
// There is no need to rollback LoadPartitions as the load job will fail
// and the Delegator will not be created,
// resulting in search and query requests failing due to the absence of Delegator.
if err != nil {
return err
}
if !merr.Ok(status) {
return merr.Error(status)
}
}
}
return nil
}
func releasePartitions(ctx context.Context,
meta *meta.Meta,
cluster session.Cluster,
collection int64,
partitions ...int64,
) {
log := log.Ctx(ctx).With(zap.Int64("collection", collection), zap.Int64s("partitions", partitions))
replicas := meta.ReplicaManager.GetByCollection(ctx, collection)
releaseReq := &querypb.ReleasePartitionsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_ReleasePartitions,
},
CollectionID: collection,
PartitionIDs: partitions,
}
for _, replica := range replicas {
for _, node := range replica.GetNodes() {
status, err := cluster.ReleasePartitions(ctx, node, releaseReq)
// Ignore error as the Delegator will be removed from the query node,
// causing search and query requests to fail due to the absence of Delegator.
if err != nil {
log.Warn("failed to ReleasePartitions", zap.Int64("node", node), zap.Error(err))
continue
}
if !merr.Ok(status) {
log.Warn("failed to ReleasePartitions", zap.Int64("node", node), zap.Error(merr.Error(status)))
}
}
// accelerate check
targetObserver.TriggerUpdateCurrentTarget(collection)
// wait current target ready
select {
case <-ready:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

View File

@ -375,6 +375,66 @@ func (_c *MockTargetManager_GetGrowingSegmentsByCollection_Call) RunAndReturn(ru
return _c
}
// GetPartitions provides a mock function with given fields: ctx, collectionID, scope
func (_m *MockTargetManager) GetPartitions(ctx context.Context, collectionID int64, scope int32) ([]int64, error) {
ret := _m.Called(ctx, collectionID, scope)
if len(ret) == 0 {
panic("no return value specified for GetPartitions")
}
var r0 []int64
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, int64, int32) ([]int64, error)); ok {
return rf(ctx, collectionID, scope)
}
if rf, ok := ret.Get(0).(func(context.Context, int64, int32) []int64); ok {
r0 = rf(ctx, collectionID, scope)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]int64)
}
}
if rf, ok := ret.Get(1).(func(context.Context, int64, int32) error); ok {
r1 = rf(ctx, collectionID, scope)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockTargetManager_GetPartitions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetPartitions'
type MockTargetManager_GetPartitions_Call struct {
*mock.Call
}
// GetPartitions is a helper method to define mock.On call
// - ctx context.Context
// - collectionID int64
// - scope int32
func (_e *MockTargetManager_Expecter) GetPartitions(ctx interface{}, collectionID interface{}, scope interface{}) *MockTargetManager_GetPartitions_Call {
return &MockTargetManager_GetPartitions_Call{Call: _e.mock.On("GetPartitions", ctx, collectionID, scope)}
}
func (_c *MockTargetManager_GetPartitions_Call) Run(run func(ctx context.Context, collectionID int64, scope int32)) *MockTargetManager_GetPartitions_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(int64), args[2].(int32))
})
return _c
}
func (_c *MockTargetManager_GetPartitions_Call) Return(_a0 []int64, _a1 error) *MockTargetManager_GetPartitions_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockTargetManager_GetPartitions_Call) RunAndReturn(run func(context.Context, int64, int32) ([]int64, error)) *MockTargetManager_GetPartitions_Call {
_c.Call.Return(run)
return _c
}
// GetSealedSegment provides a mock function with given fields: ctx, collectionID, id, scope
func (_m *MockTargetManager) GetSealedSegment(ctx context.Context, collectionID int64, id int64, scope int32) *datapb.SegmentInfo {
ret := _m.Called(ctx, collectionID, id, scope)

View File

@ -34,6 +34,7 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
@ -70,6 +71,7 @@ type TargetManagerInterface interface {
Recover(ctx context.Context, catalog metastore.QueryCoordCatalog) error
CanSegmentBeMoved(ctx context.Context, collectionID, segmentID int64) bool
GetTargetJSON(ctx context.Context, scope TargetScope) string
GetPartitions(ctx context.Context, collectionID int64, scope TargetScope) ([]int64, error)
}
type TargetManager struct {
@ -140,9 +142,9 @@ func (mgr *TargetManager) UpdateCollectionCurrentTarget(ctx context.Context, col
func (mgr *TargetManager) UpdateCollectionNextTarget(ctx context.Context, collectionID int64) error {
var vChannelInfos []*datapb.VchannelInfo
var segmentInfos []*datapb.SegmentInfo
err := retry.Handle(context.TODO(), func() (bool, error) {
err := retry.Handle(ctx, func() (bool, error) {
var err error
vChannelInfos, segmentInfos, err = mgr.broker.GetRecoveryInfoV2(context.TODO(), collectionID)
vChannelInfos, segmentInfos, err = mgr.broker.GetRecoveryInfoV2(ctx, collectionID)
if err != nil {
return true, err
}
@ -651,6 +653,18 @@ func (mgr *TargetManager) GetTargetJSON(ctx context.Context, scope TargetScope)
return string(v)
}
func (mgr *TargetManager) GetPartitions(ctx context.Context, collectionID int64, scope TargetScope) ([]int64, error) {
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
ret := mgr.getCollectionTarget(scope, collectionID)
if len(ret) == 0 {
return nil, merr.WrapErrCollectionNotLoaded(collectionID)
}
return ret[0].partitions.Collect(), nil
}
func (mgr *TargetManager) getTarget(scope TargetScope) *target {
if scope == CurrentTarget {
return mgr.current

View File

@ -174,6 +174,7 @@ func (suite *CollectionObserverSuite) SetupSuite() {
}
func (suite *CollectionObserverSuite) SetupTest() {
suite.ctx = context.Background()
// Mocks
var err error
suite.idAllocator = RandomIncrementIDAllocator()

View File

@ -26,7 +26,6 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
@ -398,7 +397,6 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect
collectionReadyLeaders = append(collectionReadyLeaders, channelReadyLeaders...)
}
var collectionInfo *milvuspb.DescribeCollectionResponse
var partitions []int64
var indexInfo []*indexpb.IndexInfo
var err error
@ -413,16 +411,9 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect
log.Warn("replica not found", zap.Int64("nodeID", leader.ID), zap.Int64("collectionID", collectionID))
continue
}
// init all the meta information
if collectionInfo == nil {
collectionInfo, err = ob.broker.DescribeCollection(ctx, collectionID)
if err != nil {
log.Warn("failed to get collection info", zap.Error(err))
return false
}
partitions, err = utils.GetPartitions(ctx, ob.meta.CollectionManager, collectionID)
if partitions == nil {
partitions, err = utils.GetPartitions(ctx, ob.targetMgr, collectionID)
if err != nil {
log.Warn("failed to get partitions", zap.Error(err))
return false
@ -436,7 +427,7 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect
}
}
if !ob.sync(ctx, replica, leader, []*querypb.SyncAction{updateVersionAction}, collectionInfo, partitions, indexInfo) {
if !ob.sync(ctx, replica, leader, []*querypb.SyncAction{updateVersionAction}, partitions, indexInfo) {
return false
}
}
@ -444,7 +435,7 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect
}
func (ob *TargetObserver) sync(ctx context.Context, replica *meta.Replica, leaderView *meta.LeaderView, diffs []*querypb.SyncAction,
collectionInfo *milvuspb.DescribeCollectionResponse, partitions []int64, indexInfo []*indexpb.IndexInfo,
partitions []int64, indexInfo []*indexpb.IndexInfo,
) bool {
if len(diffs) == 0 {
return true
@ -465,12 +456,10 @@ func (ob *TargetObserver) sync(ctx context.Context, replica *meta.Replica, leade
ReplicaID: replicaID,
Channel: leaderView.Channel,
Actions: diffs,
Schema: collectionInfo.GetSchema(),
LoadMeta: &querypb.LoadMetaInfo{
LoadType: ob.meta.GetLoadType(ctx, leaderView.CollectionID),
CollectionID: leaderView.CollectionID,
PartitionIDs: partitions,
DbName: collectionInfo.GetDbName(),
ResourceGroup: replica.GetResourceGroup(),
},
Version: time.Now().UnixNano(),
@ -478,6 +467,7 @@ func (ob *TargetObserver) sync(ctx context.Context, replica *meta.Replica, leade
}
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond))
defer cancel()
resp, err := ob.cluster.SyncDistribution(ctx, leaderView.ID, req)
if err != nil {
log.Warn("failed to sync distribution", zap.Error(err))

View File

@ -287,7 +287,6 @@ func (s *Server) LoadCollection(ctx context.Context, req *querypb.LoadCollection
s.dist,
s.meta,
s.broker,
s.cluster,
s.targetMgr,
s.targetObserver,
s.collectionObserver,
@ -328,7 +327,6 @@ func (s *Server) ReleaseCollection(ctx context.Context, req *querypb.ReleaseColl
s.dist,
s.meta,
s.broker,
s.cluster,
s.targetMgr,
s.targetObserver,
s.checkerController,
@ -404,7 +402,6 @@ func (s *Server) LoadPartitions(ctx context.Context, req *querypb.LoadPartitions
s.dist,
s.meta,
s.broker,
s.cluster,
s.targetMgr,
s.targetObserver,
s.collectionObserver,
@ -451,7 +448,6 @@ func (s *Server) ReleasePartitions(ctx context.Context, req *querypb.ReleasePart
s.dist,
s.meta,
s.broker,
s.cluster,
s.targetMgr,
s.targetObserver,
s.checkerController,
@ -596,7 +592,7 @@ func (s *Server) SyncNewCreatedPartition(ctx context.Context, req *querypb.SyncN
return merr.Status(err), nil
}
syncJob := job.NewSyncNewCreatedPartitionJob(ctx, req, s.meta, s.cluster, s.broker)
syncJob := job.NewSyncNewCreatedPartitionJob(ctx, req, s.meta, s.broker, s.targetObserver, s.targetMgr)
s.jobScheduler.Add(syncJob)
err := syncJob.Wait()
if err != nil {

View File

@ -151,6 +151,8 @@ func (suite *ServiceSuite) SetupTest() {
suite.meta = meta.NewMeta(params.RandomIncrementIDAllocator(), suite.store, suite.nodeMgr)
suite.broker = meta.NewMockBroker(suite.T())
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta)
suite.cluster = session.NewMockCluster(suite.T())
suite.cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe()
suite.targetObserver = observers.NewTargetObserver(
suite.meta,
suite.targetMgr,
@ -168,8 +170,6 @@ func (suite *ServiceSuite) SetupTest() {
}))
suite.meta.ResourceManager.HandleNodeUp(context.TODO(), node)
}
suite.cluster = session.NewMockCluster(suite.T())
suite.cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe()
suite.jobScheduler = job.NewScheduler()
suite.taskScheduler = task.NewMockScheduler(suite.T())
suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
@ -345,8 +345,9 @@ func (suite *ServiceSuite) TestLoadCollection() {
// Test load all collections
for _, collection := range suite.collections {
suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything).
Return(nil, nil)
suite.expectGetRecoverInfo(collection)
suite.expectLoadPartitions()
req := &querypb.LoadCollectionRequest{
CollectionID: collection,
@ -914,7 +915,8 @@ func (suite *ServiceSuite) TestLoadPartition() {
// Test load all partitions
for _, collection := range suite.collections {
suite.expectLoadPartitions()
suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything).
Return(nil, nil)
suite.expectGetRecoverInfo(collection)
req := &querypb.LoadPartitionsRequest{
@ -1009,9 +1011,6 @@ func (suite *ServiceSuite) TestReleaseCollection() {
ctx := context.Background()
server := suite.server
suite.cluster.EXPECT().ReleasePartitions(mock.Anything, mock.Anything, mock.Anything).
Return(merr.Success(), nil)
// Test release all collections
for _, collection := range suite.collections {
req := &querypb.ReleaseCollectionRequest{
@ -1044,18 +1043,23 @@ func (suite *ServiceSuite) TestReleaseCollection() {
}
func (suite *ServiceSuite) TestReleasePartition() {
suite.loadAll()
ctx := context.Background()
suite.loadAll()
for _, collection := range suite.collections {
suite.updateChannelDist(ctx, collection)
suite.updateSegmentDist(collection, suite.nodes[0])
}
server := suite.server
// Test release all partitions
suite.cluster.EXPECT().ReleasePartitions(mock.Anything, mock.Anything, mock.Anything).
Return(merr.Success(), nil)
for _, collection := range suite.collections {
req := &querypb.ReleasePartitionsRequest{
CollectionID: collection,
PartitionIDs: suite.partitions[collection][0:1],
}
suite.updateChannelDist(ctx, collection)
suite.updateSegmentDist(collection, suite.nodes[0], suite.partitions[collection][1:]...)
resp, err := server.ReleasePartitions(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_Success, resp.ErrorCode)
@ -1826,7 +1830,7 @@ func (suite *ServiceSuite) TestHandleNodeUp() {
func (suite *ServiceSuite) loadAll() {
ctx := context.Background()
for _, collection := range suite.collections {
suite.expectLoadPartitions()
suite.expectLoadMetaRPCs()
suite.expectGetRecoverInfo(collection)
if suite.loadTypes[collection] == querypb.LoadType_LoadCollection {
req := &querypb.LoadCollectionRequest{
@ -1839,7 +1843,6 @@ func (suite *ServiceSuite) loadAll() {
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
@ -1864,7 +1867,6 @@ func (suite *ServiceSuite) loadAll() {
suite.dist,
suite.meta,
suite.broker,
suite.cluster,
suite.targetMgr,
suite.targetObserver,
suite.collectionObserver,
@ -1963,13 +1965,11 @@ func (suite *ServiceSuite) expectGetRecoverInfo(collection int64) {
Return(vChannels, segmentBinlogs, nil).Maybe()
}
func (suite *ServiceSuite) expectLoadPartitions() {
func (suite *ServiceSuite) expectLoadMetaRPCs() {
suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything).
Return(nil, nil)
Return(nil, nil).Maybe()
suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).
Return(nil, nil)
suite.cluster.EXPECT().LoadPartitions(mock.Anything, mock.Anything, mock.Anything).
Return(merr.Success(), nil)
Return(nil, nil).Maybe()
}
func (suite *ServiceSuite) getAllSegments(collection int64) []int64 {
@ -1980,9 +1980,13 @@ func (suite *ServiceSuite) getAllSegments(collection int64) []int64 {
return allSegments
}
func (suite *ServiceSuite) updateSegmentDist(collection, node int64) {
func (suite *ServiceSuite) updateSegmentDist(collection, node int64, partitions ...int64) {
partitionSet := typeutil.NewSet(partitions...)
metaSegments := make([]*meta.Segment, 0)
for partition, segments := range suite.segments[collection] {
if partitionSet.Len() > 0 && !partitionSet.Contain(partition) {
continue
}
for _, segment := range segments {
metaSegments = append(metaSegments,
utils.CreateTestSegment(collection, partition, segment, node, 1, "test-channel"))

View File

@ -345,7 +345,7 @@ func (ex *Executor) subscribeChannel(task *ChannelTask, step int) error {
return err
}
loadFields := ex.meta.GetLoadFields(ctx, task.CollectionID())
partitions, err := utils.GetPartitions(ctx, ex.meta.CollectionManager, task.CollectionID())
partitions, err := utils.GetPartitions(ctx, ex.targetMgr, task.CollectionID())
if err != nil {
log.Warn("failed to get partitions of collection")
return err
@ -653,7 +653,7 @@ func (ex *Executor) getMetaInfo(ctx context.Context, task Task) (*milvuspb.Descr
return nil, nil, nil, err
}
loadFields := ex.meta.GetLoadFields(ctx, task.CollectionID())
partitions, err := utils.GetPartitions(ctx, ex.meta.CollectionManager, collectionID)
partitions, err := utils.GetPartitions(ctx, ex.targetMgr, collectionID)
if err != nil {
log.Warn("failed to get partitions of collection", zap.Error(err))
return nil, nil, nil, err

View File

@ -30,18 +30,10 @@ import (
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
func GetPartitions(ctx context.Context, collectionMgr *meta.CollectionManager, collectionID int64) ([]int64, error) {
collection := collectionMgr.GetCollection(ctx, collectionID)
if collection != nil {
partitions := collectionMgr.GetPartitionsByCollection(ctx, collectionID)
if partitions != nil {
return lo.Map(partitions, func(partition *meta.Partition, i int) int64 {
return partition.PartitionID
}), nil
}
}
return nil, merr.WrapErrCollectionNotLoaded(collectionID)
func GetPartitions(ctx context.Context, targetMgr meta.TargetManagerInterface, collectionID int64) ([]int64, error) {
// fetch next target first, sync next target contains the wanted partition list
// if not found, current will be used instead for dist adjustment requests
return targetMgr.GetPartitions(ctx, collectionID, meta.NextTargetFirst)
}
// GroupNodesByReplica groups nodes by replica,

View File

@ -82,7 +82,7 @@ type ShardDelegator interface {
LoadGrowing(ctx context.Context, infos []*querypb.SegmentLoadInfo, version int64) error
LoadSegments(ctx context.Context, req *querypb.LoadSegmentsRequest) error
ReleaseSegments(ctx context.Context, req *querypb.ReleaseSegmentsRequest, force bool) error
SyncTargetVersion(newVersion int64, growingInTarget []int64, sealedInTarget []int64, droppedInTarget []int64, checkpoint *msgpb.MsgPosition)
SyncTargetVersion(newVersion int64, partitions []int64, growingInTarget []int64, sealedInTarget []int64, droppedInTarget []int64, checkpoint *msgpb.MsgPosition)
GetTargetVersion() int64
GetDeleteBufferSize() (entryNum int64, memorySize int64)
@ -268,25 +268,6 @@ func (sd *shardDelegator) modifyQueryRequest(req *querypb.QueryRequest, scope qu
return nodeReq
}
func (sd *shardDelegator) getTargetPartitions(reqPartitions []int64) (searchPartitions []int64, err error) {
existPartitions := sd.collection.GetPartitions()
// search all loaded partitions if req partition ids not provided
if len(reqPartitions) == 0 {
searchPartitions = existPartitions
return searchPartitions, nil
}
// use brute search to avoid map struct cost
for _, partition := range reqPartitions {
if !funcutil.SliceContain(existPartitions, partition) {
return nil, merr.WrapErrPartitionNotLoaded(reqPartitions)
}
}
searchPartitions = reqPartitions
return searchPartitions, nil
}
// Search preforms search operation on shard.
func (sd *shardDelegator) search(ctx context.Context, req *querypb.SearchRequest, sealed []SnapshotItem, growing []SegmentEntry) ([]*internalpb.SearchResults, error) {
log := sd.getLogger(ctx)
@ -382,19 +363,10 @@ func (sd *shardDelegator) Search(ctx context.Context, req *querypb.SearchRequest
sealed, growing, version, err := sd.distribution.PinReadableSegments(req.GetReq().GetPartitionIDs()...)
if err != nil {
log.Warn("delegator failed to search, current distribution is not serviceable")
return nil, merr.WrapErrChannelNotAvailable(sd.vchannelName, "distribution is not servcieable")
}
defer sd.distribution.Unpin(version)
targetPartitions, err := sd.getTargetPartitions(req.GetReq().GetPartitionIDs())
if err != nil {
log.Warn("delegator failed to search, current distribution is not serviceable", zap.Error(err))
return nil, err
}
// set target partition ids to sub task request
req.Req.PartitionIDs = targetPartitions
growing = lo.Filter(growing, func(segment SegmentEntry, _ int) bool {
return funcutil.SliceContain(targetPartitions, segment.PartitionID)
})
defer sd.distribution.Unpin(version)
if req.GetReq().GetIsAdvanced() {
futures := make([]*conc.Future[*internalpb.SearchResults], len(req.GetReq().GetSubReqs()))
@ -499,21 +471,11 @@ func (sd *shardDelegator) QueryStream(ctx context.Context, req *querypb.QueryReq
sealed, growing, version, err := sd.distribution.PinReadableSegments(req.GetReq().GetPartitionIDs()...)
if err != nil {
log.Warn("delegator failed to query, current distribution is not serviceable")
return merr.WrapErrChannelNotAvailable(sd.vchannelName, "distribution is not servcieable")
log.Warn("delegator failed to query, current distribution is not serviceable", zap.Error(err))
return err
}
defer sd.distribution.Unpin(version)
targetPartitions, err := sd.getTargetPartitions(req.GetReq().GetPartitionIDs())
if err != nil {
return err
}
// set target partition ids to sub task request
req.Req.PartitionIDs = targetPartitions
growing = lo.Filter(growing, func(segment SegmentEntry, _ int) bool {
return funcutil.SliceContain(targetPartitions, segment.PartitionID)
})
if req.Req.IgnoreGrowing {
growing = []SegmentEntry{}
}
@ -572,24 +534,13 @@ func (sd *shardDelegator) Query(ctx context.Context, req *querypb.QueryRequest)
sealed, growing, version, err := sd.distribution.PinReadableSegments(req.GetReq().GetPartitionIDs()...)
if err != nil {
log.Warn("delegator failed to query, current distribution is not serviceable")
return nil, merr.WrapErrChannelNotAvailable(sd.vchannelName, "distribution is not servcieable")
log.Warn("delegator failed to query, current distribution is not serviceable", zap.Error(err))
return nil, err
}
defer sd.distribution.Unpin(version)
targetPartitions, err := sd.getTargetPartitions(req.GetReq().GetPartitionIDs())
if err != nil {
return nil, err
}
// set target partition ids to sub task request
req.Req.PartitionIDs = targetPartitions
if req.Req.IgnoreGrowing {
growing = []SegmentEntry{}
} else {
growing = lo.Filter(growing, func(segment SegmentEntry, _ int) bool {
return funcutil.SliceContain(targetPartitions, segment.PartitionID)
})
}
if paramtable.Get().QueryNodeCfg.EnableSegmentPrune.GetAsBool() {

View File

@ -948,7 +948,7 @@ func (sd *shardDelegator) ReleaseSegments(ctx context.Context, req *querypb.Rele
return nil
}
func (sd *shardDelegator) SyncTargetVersion(newVersion int64, growingInTarget []int64,
func (sd *shardDelegator) SyncTargetVersion(newVersion int64, partitions []int64, growingInTarget []int64,
sealedInTarget []int64, droppedInTarget []int64, checkpoint *msgpb.MsgPosition,
) {
growings := sd.segmentManager.GetBy(
@ -980,7 +980,7 @@ func (sd *shardDelegator) SyncTargetVersion(newVersion int64, growingInTarget []
log.Warn("found redundant growing segments",
zap.Int64s("growingSegments", redundantGrowingIDs))
}
sd.distribution.SyncTargetVersion(newVersion, growingInTarget, sealedInTarget, redundantGrowingIDs)
sd.distribution.SyncTargetVersion(newVersion, partitions, growingInTarget, sealedInTarget, redundantGrowingIDs)
sd.deleteBuffer.TryDiscard(checkpoint.GetTimestamp())
}

View File

@ -1363,9 +1363,10 @@ func (s *DelegatorDataSuite) TestReleaseSegment() {
s.ElementsMatch([]SegmentEntry{
{
SegmentID: 1001,
NodeID: 1,
PartitionID: 500,
SegmentID: 1001,
NodeID: 1,
PartitionID: 500,
TargetVersion: unreadableTargetVersion,
},
}, growing)
@ -1503,7 +1504,7 @@ func (s *DelegatorDataSuite) TestSyncTargetVersion() {
s.manager.Segment.Put(context.Background(), segments.SegmentTypeGrowing, ms)
}
s.delegator.SyncTargetVersion(int64(5), []int64{1}, []int64{2}, []int64{3, 4}, &msgpb.MsgPosition{})
s.delegator.SyncTargetVersion(int64(5), []int64{1}, []int64{1}, []int64{2}, []int64{3, 4}, &msgpb.MsgPosition{})
s.Equal(int64(5), s.delegator.GetTargetVersion())
}

View File

@ -331,7 +331,7 @@ func (s *DelegatorSuite) initSegments() {
Version: 2001,
},
)
s.delegator.SyncTargetVersion(2001, []int64{1004}, []int64{1000, 1001, 1002, 1003}, []int64{}, &msgpb.MsgPosition{})
s.delegator.SyncTargetVersion(2001, []int64{500, 501}, []int64{1004}, []int64{1000, 1001, 1002, 1003}, []int64{}, &msgpb.MsgPosition{})
}
func (s *DelegatorSuite) TestSearch() {

View File

@ -174,18 +174,21 @@ func (s *StreamingForwardSuite) TestBFStreamingForward() {
// Setup distribution
delegator.distribution.AddGrowing(SegmentEntry{
NodeID: 1,
SegmentID: 100,
NodeID: 1,
PartitionID: 1,
SegmentID: 100,
})
delegator.distribution.AddDistributions(SegmentEntry{
NodeID: 1,
SegmentID: 101,
NodeID: 1,
PartitionID: 1,
SegmentID: 101,
})
delegator.distribution.AddDistributions(SegmentEntry{
NodeID: 1,
SegmentID: 102,
NodeID: 1,
PartitionID: 1,
SegmentID: 102,
})
delegator.distribution.SyncTargetVersion(1, []int64{100}, []int64{101, 102}, nil)
delegator.distribution.SyncTargetVersion(1, []int64{1}, []int64{100}, []int64{101, 102}, nil)
// Setup pk oracle
// empty bfs will not match
@ -224,18 +227,21 @@ func (s *StreamingForwardSuite) TestDirectStreamingForward() {
// Setup distribution
delegator.distribution.AddGrowing(SegmentEntry{
NodeID: 1,
SegmentID: 100,
NodeID: 1,
PartitionID: 1,
SegmentID: 100,
})
delegator.distribution.AddDistributions(SegmentEntry{
NodeID: 1,
SegmentID: 101,
NodeID: 1,
PartitionID: 1,
SegmentID: 101,
})
delegator.distribution.AddDistributions(SegmentEntry{
NodeID: 1,
SegmentID: 102,
NodeID: 1,
PartitionID: 1,
SegmentID: 102,
})
delegator.distribution.SyncTargetVersion(1, []int64{100}, []int64{101, 102}, nil)
delegator.distribution.SyncTargetVersion(1, []int64{1}, []int64{100}, []int64{101, 102}, nil)
// Setup pk oracle
// empty bfs will not match

View File

@ -117,10 +117,17 @@ func (d *distribution) PinReadableSegments(partitions ...int64) (sealed []Snapsh
defer d.mut.RUnlock()
if !d.Serviceable() {
return nil, nil, -1, merr.WrapErrServiceInternal("channel distribution is not serviceable")
return nil, nil, -1, merr.WrapErrChannelNotAvailable("channel distribution is not serviceable")
}
current := d.current.Load()
// snapshot sanity check
// if user specified a partition id which is not serviceable, return err
for _, partition := range partitions {
if !current.partitions.Contain(partition) {
return nil, nil, -1, merr.WrapErrPartitionNotLoaded(partition)
}
}
sealed, growing = current.Get(partitions...)
version = current.version
targetVersion := current.GetTargetVersion()
@ -261,7 +268,7 @@ func (d *distribution) AddOfflines(segmentIDs ...int64) {
}
// UpdateTargetVersion update readable segment version
func (d *distribution) SyncTargetVersion(newVersion int64, growingInTarget []int64, sealedInTarget []int64, redundantGrowings []int64) {
func (d *distribution) SyncTargetVersion(newVersion int64, partitions []int64, growingInTarget []int64, sealedInTarget []int64, redundantGrowings []int64) {
d.mut.Lock()
defer d.mut.Unlock()
@ -299,10 +306,12 @@ func (d *distribution) SyncTargetVersion(newVersion int64, growingInTarget []int
oldValue := d.targetVersion.Load()
d.targetVersion.Store(newVersion)
d.genSnapshot()
// update working partition list
d.genSnapshot(WithPartitions(partitions))
// if sealed segment in leader view is less than sealed segment in target, set delegator to unserviceable
d.serviceable.Store(available)
log.Info("Update readable segment version",
zap.Int64s("partitions", partitions),
zap.Int64("oldVersion", oldValue),
zap.Int64("newVersion", newVersion),
zap.Int("growingSegmentNum", len(growingInTarget)),
@ -345,33 +354,55 @@ func (d *distribution) RemoveDistributions(sealedSegments []SegmentEntry, growin
// getSnapshot converts current distribution to snapshot format.
// in which, user could use found nodeID=>segmentID list.
// mutex RLock is required before calling this method.
func (d *distribution) genSnapshot() chan struct{} {
func (d *distribution) genSnapshot(opts ...genSnapshotOpt) chan struct{} {
// stores last snapshot
// ok to be nil
last := d.current.Load()
option := &genSnapshotOption{
partitions: typeutil.NewSet[int64](), // if no working list provided, snapshot shall have no item
}
// use last snapshot working parition list by default
if last != nil {
option.partitions = last.partitions
}
for _, opt := range opts {
opt(option)
}
nodeSegments := make(map[int64][]SegmentEntry)
for _, entry := range d.sealedSegments {
nodeSegments[entry.NodeID] = append(nodeSegments[entry.NodeID], entry)
}
// only store working partition entry in snapshot to reduce calculation
dist := make([]SnapshotItem, 0, len(nodeSegments))
for nodeID, items := range nodeSegments {
dist = append(dist, SnapshotItem{
NodeID: nodeID,
Segments: items,
NodeID: nodeID,
Segments: lo.Map(items, func(entry SegmentEntry, _ int) SegmentEntry {
if !option.partitions.Contain(entry.PartitionID) {
entry.TargetVersion = unreadableTargetVersion
}
return entry
}),
})
}
growing := make([]SegmentEntry, 0, len(d.growingSegments))
for _, entry := range d.growingSegments {
if !option.partitions.Contain(entry.PartitionID) {
entry.TargetVersion = unreadableTargetVersion
}
growing = append(growing, entry)
}
d.serviceable.Store(d.offlines.Len() == 0)
// stores last snapshot
// ok to be nil
last := d.current.Load()
// update snapshot version
d.snapshotVersion++
newSnapShot := NewSnapshot(dist, growing, last, d.snapshotVersion, d.targetVersion.Load())
newSnapShot.partitions = option.partitions
d.current.Store(newSnapShot)
// shall be a new one
d.snapshots.GetOrInsert(d.snapshotVersion, newSnapShot)
@ -404,3 +435,15 @@ func (d *distribution) getCleanup(version int64) snapshotCleanup {
d.snapshots.GetAndRemove(version)
}
}
type genSnapshotOption struct {
partitions typeutil.Set[int64]
}
type genSnapshotOpt func(*genSnapshotOption)
func WithPartitions(partitions []int64) genSnapshotOpt {
return func(opt *genSnapshotOption) {
opt.partitions = typeutil.NewSet(partitions...)
}
}

View File

@ -217,9 +217,10 @@ func (s *DistributionSuite) compareSnapshotItems(target, value []SnapshotItem) {
func (s *DistributionSuite) TestAddGrowing() {
type testCase struct {
tag string
input []SegmentEntry
expected []SegmentEntry
tag string
workingParts []int64
input []SegmentEntry
expected []SegmentEntry
}
cases := []testCase{
@ -229,15 +230,27 @@ func (s *DistributionSuite) TestAddGrowing() {
expected: []SegmentEntry{},
},
{
tag: "normal case",
tag: "normal_case",
input: []SegmentEntry{
{SegmentID: 1, PartitionID: 1},
{SegmentID: 2, PartitionID: 2},
},
workingParts: []int64{1, 2},
expected: []SegmentEntry{
{SegmentID: 1, PartitionID: 1, TargetVersion: 1000},
{SegmentID: 2, PartitionID: 2, TargetVersion: 1000},
},
},
{
tag: "partial_partition_working",
input: []SegmentEntry{
{SegmentID: 1, PartitionID: 1},
{SegmentID: 2, PartitionID: 2},
},
workingParts: []int64{1},
expected: []SegmentEntry{
{SegmentID: 1, PartitionID: 1, TargetVersion: 1000},
},
},
}
@ -247,6 +260,7 @@ func (s *DistributionSuite) TestAddGrowing() {
defer s.TearDownTest()
s.dist.AddGrowing(tc.input...)
s.dist.SyncTargetVersion(1000, tc.workingParts, []int64{1, 2}, nil, nil)
_, growing, version, err := s.dist.PinReadableSegments()
s.Require().NoError(err)
defer s.dist.Unpin(version)
@ -305,7 +319,7 @@ func (s *DistributionSuite) TestRemoveDistribution() {
},
},
},
expectGrowing: []SegmentEntry{{SegmentID: 4}},
expectGrowing: []SegmentEntry{{SegmentID: 4, TargetVersion: unreadableTargetVersion}},
},
{
tag: "remove with wrong nodeID",
@ -341,7 +355,7 @@ func (s *DistributionSuite) TestRemoveDistribution() {
},
},
},
expectGrowing: []SegmentEntry{{SegmentID: 4}, {SegmentID: 5}},
expectGrowing: []SegmentEntry{{SegmentID: 4, TargetVersion: unreadableTargetVersion}, {SegmentID: 5, TargetVersion: unreadableTargetVersion}},
},
{
tag: "remove with wildcardNodeID",
@ -376,7 +390,7 @@ func (s *DistributionSuite) TestRemoveDistribution() {
},
},
},
expectGrowing: []SegmentEntry{{SegmentID: 4}, {SegmentID: 5}},
expectGrowing: []SegmentEntry{{SegmentID: 4, TargetVersion: unreadableTargetVersion}, {SegmentID: 5, TargetVersion: unreadableTargetVersion}},
},
{
tag: "remove with read",
@ -421,7 +435,7 @@ func (s *DistributionSuite) TestRemoveDistribution() {
},
},
},
expectGrowing: []SegmentEntry{{SegmentID: 4}},
expectGrowing: []SegmentEntry{{SegmentID: 4, TargetVersion: unreadableTargetVersion}},
},
}
@ -714,7 +728,7 @@ func (s *DistributionSuite) Test_SyncTargetVersion() {
s.dist.AddGrowing(growing...)
s.dist.AddDistributions(sealed...)
s.dist.SyncTargetVersion(2, []int64{2, 3}, []int64{6}, []int64{})
s.dist.SyncTargetVersion(2, []int64{1}, []int64{2, 3}, []int64{6}, []int64{})
s1, s2, _, err := s.dist.PinReadableSegments()
s.Require().NoError(err)
@ -726,13 +740,13 @@ func (s *DistributionSuite) Test_SyncTargetVersion() {
s.Len(s2, 3)
s.dist.serviceable.Store(true)
s.dist.SyncTargetVersion(2, []int64{222}, []int64{}, []int64{})
s.dist.SyncTargetVersion(2, []int64{1}, []int64{222}, []int64{}, []int64{})
s.True(s.dist.Serviceable())
s.dist.SyncTargetVersion(2, []int64{}, []int64{333}, []int64{})
s.dist.SyncTargetVersion(2, []int64{1}, []int64{}, []int64{333}, []int64{})
s.False(s.dist.Serviceable())
s.dist.SyncTargetVersion(2, []int64{}, []int64{333}, []int64{1, 2, 3})
s.dist.SyncTargetVersion(2, []int64{1}, []int64{}, []int64{333}, []int64{1, 2, 3})
_, _, _, err = s.dist.PinReadableSegments()
s.Error(err)
}

View File

@ -940,9 +940,9 @@ func (_c *MockShardDelegator_SyncPartitionStats_Call) RunAndReturn(run func(cont
return _c
}
// SyncTargetVersion provides a mock function with given fields: newVersion, growingInTarget, sealedInTarget, droppedInTarget, checkpoint
func (_m *MockShardDelegator) SyncTargetVersion(newVersion int64, growingInTarget []int64, sealedInTarget []int64, droppedInTarget []int64, checkpoint *msgpb.MsgPosition) {
_m.Called(newVersion, growingInTarget, sealedInTarget, droppedInTarget, checkpoint)
// SyncTargetVersion provides a mock function with given fields: newVersion, partitions, growingInTarget, sealedInTarget, droppedInTarget, checkpoint
func (_m *MockShardDelegator) SyncTargetVersion(newVersion int64, partitions []int64, growingInTarget []int64, sealedInTarget []int64, droppedInTarget []int64, checkpoint *msgpb.MsgPosition) {
_m.Called(newVersion, partitions, growingInTarget, sealedInTarget, droppedInTarget, checkpoint)
}
// MockShardDelegator_SyncTargetVersion_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SyncTargetVersion'
@ -952,17 +952,18 @@ type MockShardDelegator_SyncTargetVersion_Call struct {
// SyncTargetVersion is a helper method to define mock.On call
// - newVersion int64
// - partitions []int64
// - growingInTarget []int64
// - sealedInTarget []int64
// - droppedInTarget []int64
// - checkpoint *msgpb.MsgPosition
func (_e *MockShardDelegator_Expecter) SyncTargetVersion(newVersion interface{}, growingInTarget interface{}, sealedInTarget interface{}, droppedInTarget interface{}, checkpoint interface{}) *MockShardDelegator_SyncTargetVersion_Call {
return &MockShardDelegator_SyncTargetVersion_Call{Call: _e.mock.On("SyncTargetVersion", newVersion, growingInTarget, sealedInTarget, droppedInTarget, checkpoint)}
func (_e *MockShardDelegator_Expecter) SyncTargetVersion(newVersion interface{}, partitions interface{}, growingInTarget interface{}, sealedInTarget interface{}, droppedInTarget interface{}, checkpoint interface{}) *MockShardDelegator_SyncTargetVersion_Call {
return &MockShardDelegator_SyncTargetVersion_Call{Call: _e.mock.On("SyncTargetVersion", newVersion, partitions, growingInTarget, sealedInTarget, droppedInTarget, checkpoint)}
}
func (_c *MockShardDelegator_SyncTargetVersion_Call) Run(run func(newVersion int64, growingInTarget []int64, sealedInTarget []int64, droppedInTarget []int64, checkpoint *msgpb.MsgPosition)) *MockShardDelegator_SyncTargetVersion_Call {
func (_c *MockShardDelegator_SyncTargetVersion_Call) Run(run func(newVersion int64, partitions []int64, growingInTarget []int64, sealedInTarget []int64, droppedInTarget []int64, checkpoint *msgpb.MsgPosition)) *MockShardDelegator_SyncTargetVersion_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64), args[1].([]int64), args[2].([]int64), args[3].([]int64), args[4].(*msgpb.MsgPosition))
run(args[0].(int64), args[1].([]int64), args[2].([]int64), args[3].([]int64), args[4].([]int64), args[5].(*msgpb.MsgPosition))
})
return _c
}
@ -972,7 +973,7 @@ func (_c *MockShardDelegator_SyncTargetVersion_Call) Return() *MockShardDelegato
return _c
}
func (_c *MockShardDelegator_SyncTargetVersion_Call) RunAndReturn(run func(int64, []int64, []int64, []int64, *msgpb.MsgPosition)) *MockShardDelegator_SyncTargetVersion_Call {
func (_c *MockShardDelegator_SyncTargetVersion_Call) RunAndReturn(run func(int64, []int64, []int64, []int64, []int64, *msgpb.MsgPosition)) *MockShardDelegator_SyncTargetVersion_Call {
_c.Call.Return(run)
return _c
}

View File

@ -23,6 +23,7 @@ import (
"go.uber.org/atomic"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
// SnapshotItem group segmentEntry slice
@ -36,6 +37,7 @@ type snapshotCleanup func()
// snapshot records segment distribution with ref count.
type snapshot struct {
partitions typeutil.Set[int64]
dist []SnapshotItem
growing []SegmentEntry
targetVersion int64
@ -60,6 +62,7 @@ type snapshot struct {
// NewSnapshot returns a prepared snapshot with channel initialized.
func NewSnapshot(sealed []SnapshotItem, growing []SegmentEntry, last *snapshot, version int64, targetVersion int64) *snapshot {
return &snapshot{
partitions: typeutil.NewSet[int64](),
version: version,
growing: growing,
dist: sealed,

View File

@ -18,45 +18,20 @@ package segments
import (
"context"
"fmt"
"github.com/cockroachdb/errors"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/merr"
)
func validate(ctx context.Context, manager *Manager, collectionID int64, partitionIDs []int64, segmentIDs []int64, segmentFilter SegmentFilter) ([]Segment, error) {
var searchPartIDs []int64
collection := manager.Collection.Get(collectionID)
if collection == nil {
return nil, merr.WrapErrCollectionNotFound(collectionID)
}
// validate partition
// no partition id specified, get all partition ids in collection
if len(partitionIDs) == 0 {
searchPartIDs = collection.GetPartitions()
} else {
// use request partition ids directly, ignoring meta partition ids
// partitions shall be controlled by delegator distribution
searchPartIDs = partitionIDs
}
log.Ctx(ctx).Debug("read target partitions", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", searchPartIDs))
// all partitions have been released
if len(searchPartIDs) == 0 && collection.GetLoadType() == querypb.LoadType_LoadPartition {
return nil, errors.Newf("partitions have been released , collectionID = %d target partitionIDs = %v", collectionID, searchPartIDs)
}
if len(searchPartIDs) == 0 && collection.GetLoadType() == querypb.LoadType_LoadCollection {
return []Segment{}, nil
}
log.Ctx(ctx).Debug("read target partitions", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs))
// validate segment
segments := make([]Segment, 0, len(segmentIDs))
@ -67,23 +42,18 @@ func validate(ctx context.Context, manager *Manager, collectionID int64, partiti
}
}()
if len(segmentIDs) == 0 {
for _, partID := range searchPartIDs {
segments, err = manager.Segment.GetAndPinBy(WithPartition(partID), segmentFilter)
if err != nil {
return nil, err
}
// legacy logic
segments, err = manager.Segment.GetAndPinBy(segmentFilter, SegmentFilterFunc(func(s Segment) bool {
return s.Collection() == collectionID
}))
if err != nil {
return nil, err
}
} else {
segments, err = manager.Segment.GetAndPin(segmentIDs, segmentFilter)
if err != nil {
return nil, err
}
for _, segment := range segments {
if !funcutil.SliceContain(searchPartIDs, segment.Partition()) {
err = fmt.Errorf("segment %d belongs to partition %d, which is not in %v", segment.ID(), segment.Partition(), searchPartIDs)
return nil, err
}
}
}
return segments, nil
}

View File

@ -1292,7 +1292,7 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi
})
})
case querypb.SyncType_UpdateVersion:
log.Info("sync action", zap.Int64("TargetVersion", action.GetTargetVersion()))
log.Info("sync action", zap.Int64("TargetVersion", action.GetTargetVersion()), zap.Int64s("partitions", req.GetLoadMeta().GetPartitionIDs()))
droppedInfos := lo.SliceToMap(action.GetDroppedInTarget(), func(id int64) (int64, uint64) {
if action.GetCheckpoint() == nil {
return id, typeutil.MaxTimestamp
@ -1307,7 +1307,7 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi
return id, action.GetCheckpoint().Timestamp
})
shardDelegator.AddExcludedSegments(flushedInfo)
shardDelegator.SyncTargetVersion(action.GetTargetVersion(), action.GetGrowingInTarget(),
shardDelegator.SyncTargetVersion(action.GetTargetVersion(), req.GetLoadMeta().GetPartitionIDs(), action.GetGrowingInTarget(),
action.GetSealedInTarget(), action.GetDroppedInTarget(), action.GetCheckpoint())
case querypb.SyncType_UpdatePartitionStats:
log.Info("sync update partition stats versions")

View File

@ -1162,6 +1162,20 @@ func (suite *ServiceSuite) TestGetSegmentInfo_Failed() {
suite.Equal(commonpb.ErrorCode_NotReadyServe, rsp.GetStatus().GetErrorCode())
}
func (suite *ServiceSuite) syncDistribution(ctx context.Context) {
suite.node.SyncDistribution(ctx, &querypb.SyncDistributionRequest{
Channel: suite.vchannel,
CollectionID: suite.collectionID,
LoadMeta: &querypb.LoadMetaInfo{
CollectionID: suite.collectionID,
PartitionIDs: suite.partitionIDs,
},
Actions: []*querypb.SyncAction{
{Type: querypb.SyncType_UpdateVersion, SealedInTarget: suite.validSegmentIDs, TargetVersion: time.Now().UnixNano()},
},
})
}
// Test Search
func (suite *ServiceSuite) genCSearchRequest(nq int64, dataType schemapb.DataType, fieldID int64, metricType string, isTopkReduce bool) (*internalpb.SearchRequest, error) {
placeHolder, err := genPlaceHolderGroup(nq)
@ -1196,6 +1210,7 @@ func (suite *ServiceSuite) TestSearch_Normal() {
// pre
suite.TestWatchDmChannelsInt64()
suite.TestLoadSegments_Int64()
suite.syncDistribution(ctx)
creq, err := suite.genCSearchRequest(10, schemapb.DataType_FloatVector, 107, defaultMetricType, false)
req := &querypb.SearchRequest{
@ -1216,6 +1231,7 @@ func (suite *ServiceSuite) TestSearch_Concurrent() {
// pre
suite.TestWatchDmChannelsInt64()
suite.TestLoadSegments_Int64()
suite.syncDistribution(ctx)
concurrency := 16
futures := make([]*conc.Future[*internalpb.SearchResults], 0, concurrency)
@ -1278,6 +1294,7 @@ func (suite *ServiceSuite) TestSearch_Failed() {
suite.TestWatchDmChannelsInt64()
suite.TestLoadSegments_Int64()
// suite.syncDistribution(ctx)
// sync segment data
syncReq := &querypb.SyncDistributionRequest{
@ -1287,6 +1304,10 @@ func (suite *ServiceSuite) TestSearch_Failed() {
},
CollectionID: suite.collectionID,
Channel: suite.vchannel,
LoadMeta: &querypb.LoadMetaInfo{
CollectionID: suite.collectionID,
PartitionIDs: suite.partitionIDs,
},
}
syncVersionAction := &querypb.SyncAction{
@ -1458,6 +1479,7 @@ func (suite *ServiceSuite) TestQuery_Normal() {
// pre
suite.TestWatchDmChannelsInt64()
suite.TestLoadSegments_Int64()
suite.syncDistribution(ctx)
// data
schema := mock_segcore.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64, false)
@ -1539,6 +1561,7 @@ func (suite *ServiceSuite) TestQueryStream_Normal() {
// prepare
suite.TestWatchDmChannelsInt64()
suite.TestLoadSegments_Int64()
suite.syncDistribution(ctx)
// data
schema := mock_segcore.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64, false)