mirror of https://github.com/milvus-io/milvus.git
fix: Fix update loading collection's load config doesn't work (#38737)
issue: #38594 pr: #38595 Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/38678/head
parent
52de43dbeb
commit
b16d04d7cc
|
@ -253,7 +253,7 @@ func (s *Server) LoadCollection(ctx context.Context, req *querypb.LoadCollection
|
||||||
|
|
||||||
var loadJob job.Job
|
var loadJob job.Job
|
||||||
collection := s.meta.GetCollection(ctx, req.GetCollectionID())
|
collection := s.meta.GetCollection(ctx, req.GetCollectionID())
|
||||||
if collection != nil && collection.GetStatus() == querypb.LoadStatus_Loaded {
|
if collection != nil {
|
||||||
// if collection is loaded, check if collection is loaded with the same replica number and resource groups
|
// if collection is loaded, check if collection is loaded with the same replica number and resource groups
|
||||||
// if replica number or resource group changes, switch to update load config
|
// if replica number or resource group changes, switch to update load config
|
||||||
collectionUsedRG := s.meta.ReplicaManager.GetResourceGroupByCollection(ctx, collection.GetCollectionID()).Collect()
|
collectionUsedRG := s.meta.ReplicaManager.GetResourceGroupByCollection(ctx, collection.GetCollectionID()).Collect()
|
||||||
|
@ -1180,7 +1180,7 @@ func (s *Server) UpdateLoadConfig(ctx context.Context, req *querypb.UpdateLoadCo
|
||||||
jobs := make([]job.Job, 0, len(req.GetCollectionIDs()))
|
jobs := make([]job.Job, 0, len(req.GetCollectionIDs()))
|
||||||
for _, collectionID := range req.GetCollectionIDs() {
|
for _, collectionID := range req.GetCollectionIDs() {
|
||||||
collection := s.meta.GetCollection(ctx, collectionID)
|
collection := s.meta.GetCollection(ctx, collectionID)
|
||||||
if collection == nil || collection.GetStatus() != querypb.LoadStatus_Loaded {
|
if collection == nil {
|
||||||
err := merr.WrapErrCollectionNotLoaded(collectionID)
|
err := merr.WrapErrCollectionNotLoaded(collectionID)
|
||||||
log.Warn("failed to update load config", zap.Error(err))
|
log.Warn("failed to update load config", zap.Error(err))
|
||||||
continue
|
continue
|
||||||
|
|
|
@ -857,58 +857,6 @@ func (suite *ServiceSuite) TestTransferReplica() {
|
||||||
suite.ErrorIs(merr.Error(resp), merr.ErrServiceNotReady)
|
suite.ErrorIs(merr.Error(resp), merr.ErrServiceNotReady)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (suite *ServiceSuite) TestLoadCollectionFailed() {
|
|
||||||
suite.loadAll()
|
|
||||||
ctx := context.Background()
|
|
||||||
server := suite.server
|
|
||||||
|
|
||||||
// Test load with different replica number
|
|
||||||
for _, collection := range suite.collections {
|
|
||||||
req := &querypb.LoadCollectionRequest{
|
|
||||||
CollectionID: collection,
|
|
||||||
ReplicaNumber: suite.replicaNumber[collection] + 1,
|
|
||||||
}
|
|
||||||
resp, err := server.LoadCollection(ctx, req)
|
|
||||||
suite.NoError(err)
|
|
||||||
suite.ErrorIs(merr.Error(resp), merr.ErrParameterInvalid)
|
|
||||||
}
|
|
||||||
|
|
||||||
req := &querypb.LoadCollectionRequest{
|
|
||||||
CollectionID: 1001,
|
|
||||||
ReplicaNumber: 2,
|
|
||||||
ResourceGroups: []string{meta.DefaultResourceGroupName, "rg"},
|
|
||||||
}
|
|
||||||
resp, err := server.LoadCollection(ctx, req)
|
|
||||||
suite.NoError(err)
|
|
||||||
suite.Equal(commonpb.ErrorCode_IllegalArgument, resp.ErrorCode)
|
|
||||||
|
|
||||||
// Test load with partitions loaded
|
|
||||||
for _, collection := range suite.collections {
|
|
||||||
if suite.loadTypes[collection] != querypb.LoadType_LoadPartition {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
req := &querypb.LoadCollectionRequest{
|
|
||||||
CollectionID: collection,
|
|
||||||
}
|
|
||||||
resp, err := server.LoadCollection(ctx, req)
|
|
||||||
suite.NoError(err)
|
|
||||||
suite.Equal(commonpb.ErrorCode_IllegalArgument, resp.ErrorCode)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test load with wrong rg num
|
|
||||||
for _, collection := range suite.collections {
|
|
||||||
req := &querypb.LoadCollectionRequest{
|
|
||||||
CollectionID: collection,
|
|
||||||
ReplicaNumber: suite.replicaNumber[collection] + 1,
|
|
||||||
ResourceGroups: []string{"rg1", "rg2"},
|
|
||||||
}
|
|
||||||
resp, err := server.LoadCollection(ctx, req)
|
|
||||||
suite.NoError(err)
|
|
||||||
suite.Equal(commonpb.ErrorCode_IllegalArgument, resp.ErrorCode)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (suite *ServiceSuite) TestLoadPartition() {
|
func (suite *ServiceSuite) TestLoadPartition() {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
server := suite.server
|
server := suite.server
|
||||||
|
|
|
@ -854,6 +854,123 @@ func (s *LoadTestSuite) TestDynamicUpdateLoadConfigs_WithRGLackOfNode() {
|
||||||
s.releaseCollection(dbName, collectionName)
|
s.releaseCollection(dbName, collectionName)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *LoadTestSuite) TestDynamicUpdateLoadConfigs_OnLoadingCollection() {
|
||||||
|
ctx := context.Background()
|
||||||
|
s.CreateCollectionWithConfiguration(ctx, &integration.CreateCollectionConfig{
|
||||||
|
DBName: dbName,
|
||||||
|
Dim: dim,
|
||||||
|
CollectionName: collectionName,
|
||||||
|
ChannelNum: 1,
|
||||||
|
SegmentNum: 1,
|
||||||
|
RowNumPerSegment: 2000,
|
||||||
|
})
|
||||||
|
|
||||||
|
// prepare resource groups
|
||||||
|
rgNum := 10
|
||||||
|
rgs := make([]string, 0)
|
||||||
|
for i := 0; i < rgNum; i++ {
|
||||||
|
rgs = append(rgs, fmt.Sprintf("rg_%d", i))
|
||||||
|
s.Cluster.QueryCoord.CreateResourceGroup(ctx, &milvuspb.CreateResourceGroupRequest{
|
||||||
|
ResourceGroup: rgs[i],
|
||||||
|
Config: &rgpb.ResourceGroupConfig{
|
||||||
|
Requests: &rgpb.ResourceGroupLimit{
|
||||||
|
NodeNum: 1,
|
||||||
|
},
|
||||||
|
Limits: &rgpb.ResourceGroupLimit{
|
||||||
|
NodeNum: 1,
|
||||||
|
},
|
||||||
|
|
||||||
|
TransferFrom: []*rgpb.ResourceGroupTransfer{
|
||||||
|
{
|
||||||
|
ResourceGroup: meta.DefaultResourceGroupName,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
TransferTo: []*rgpb.ResourceGroupTransfer{
|
||||||
|
{
|
||||||
|
ResourceGroup: meta.DefaultResourceGroupName,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := s.Cluster.QueryCoord.ListResourceGroups(ctx, &milvuspb.ListResourceGroupsRequest{})
|
||||||
|
s.NoError(err)
|
||||||
|
s.True(merr.Ok(resp.GetStatus()))
|
||||||
|
s.Len(resp.GetResourceGroups(), rgNum+1)
|
||||||
|
|
||||||
|
for i := 1; i < rgNum; i++ {
|
||||||
|
s.Cluster.AddQueryNode()
|
||||||
|
}
|
||||||
|
|
||||||
|
nodesInRG := make(map[string][]int64)
|
||||||
|
s.Eventually(func() bool {
|
||||||
|
matchCounter := 0
|
||||||
|
for _, rg := range rgs {
|
||||||
|
resp1, err := s.Cluster.QueryCoord.DescribeResourceGroup(ctx, &querypb.DescribeResourceGroupRequest{
|
||||||
|
ResourceGroup: rg,
|
||||||
|
})
|
||||||
|
s.NoError(err)
|
||||||
|
s.True(merr.Ok(resp.GetStatus()))
|
||||||
|
if len(resp1.ResourceGroup.Nodes) == 1 {
|
||||||
|
matchCounter += 1
|
||||||
|
nodesInRG[rg] = []int64{resp1.ResourceGroup.Nodes[0].NodeId}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return matchCounter == rgNum
|
||||||
|
}, 30*time.Second, time.Second)
|
||||||
|
|
||||||
|
// trigger collection loading, and modify collection's load config during loading
|
||||||
|
loadStatus, err := s.Cluster.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
|
||||||
|
DbName: dbName,
|
||||||
|
CollectionName: collectionName,
|
||||||
|
ReplicaNumber: 1,
|
||||||
|
ResourceGroups: rgs[:1],
|
||||||
|
})
|
||||||
|
s.NoError(err)
|
||||||
|
s.True(merr.Ok(loadStatus))
|
||||||
|
loadStatus, err = s.Cluster.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
|
||||||
|
DbName: dbName,
|
||||||
|
CollectionName: collectionName,
|
||||||
|
ReplicaNumber: 3,
|
||||||
|
ResourceGroups: rgs[1:4],
|
||||||
|
})
|
||||||
|
s.NoError(err)
|
||||||
|
s.True(merr.Ok(loadStatus))
|
||||||
|
loadStatus, err = s.Cluster.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
|
||||||
|
DbName: dbName,
|
||||||
|
CollectionName: collectionName,
|
||||||
|
ReplicaNumber: 5,
|
||||||
|
ResourceGroups: rgs[4:9],
|
||||||
|
})
|
||||||
|
s.NoError(err)
|
||||||
|
s.True(merr.Ok(loadStatus))
|
||||||
|
|
||||||
|
s.Eventually(func() bool {
|
||||||
|
resp3, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{
|
||||||
|
DbName: dbName,
|
||||||
|
CollectionName: collectionName,
|
||||||
|
})
|
||||||
|
s.NoError(err)
|
||||||
|
s.True(merr.Ok(resp3.Status))
|
||||||
|
return len(resp3.GetReplicas()) == 5
|
||||||
|
}, 30*time.Second, 1*time.Second)
|
||||||
|
|
||||||
|
s.Eventually(func() bool {
|
||||||
|
segmentNum, channelNum := 0, 0
|
||||||
|
for _, qn := range s.Cluster.GetAllQueryNodes() {
|
||||||
|
resp, err := qn.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{})
|
||||||
|
s.NoError(err)
|
||||||
|
s.True(merr.Ok(resp.Status))
|
||||||
|
segmentNum += len(resp.Segments)
|
||||||
|
channelNum += len(resp.Channels)
|
||||||
|
}
|
||||||
|
return segmentNum == 5 && channelNum == 5
|
||||||
|
}, 30*time.Second, 1*time.Second)
|
||||||
|
|
||||||
|
s.releaseCollection(dbName, collectionName)
|
||||||
|
}
|
||||||
|
|
||||||
func TestReplicas(t *testing.T) {
|
func TestReplicas(t *testing.T) {
|
||||||
suite.Run(t, new(LoadTestSuite))
|
suite.Run(t, new(LoadTestSuite))
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue