mirror of https://github.com/milvus-io/milvus.git
fix: Fill load field list from old version load info (#35993)
See also #35959 --------- Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/36009/head
parent
c61eea737b
commit
f985173da0
|
@ -23,13 +23,16 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/samber/lo"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/metastore"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/eventlog"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
|
@ -156,6 +159,16 @@ func (m *CollectionManager) Recover(broker Broker) error {
|
|||
continue
|
||||
}
|
||||
|
||||
err := m.upgradeLoadFields(collection, broker)
|
||||
if err != nil {
|
||||
if errors.Is(err, merr.ErrCollectionNotFound) {
|
||||
log.Warn("collection not found, skip upgrade logic and wait for release")
|
||||
} else {
|
||||
log.Warn("upgrade load field failed", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
m.collections[collection.CollectionID] = &Collection{
|
||||
CollectionLoadInfo: collection,
|
||||
}
|
||||
|
@ -194,6 +207,36 @@ func (m *CollectionManager) Recover(broker Broker) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (m *CollectionManager) upgradeLoadFields(collection *querypb.CollectionLoadInfo, broker Broker) error {
|
||||
// only fill load fields when value is nil
|
||||
if collection.LoadFields != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// invoke describe collection to get collection schema
|
||||
resp, err := broker.DescribeCollection(context.Background(), collection.CollectionID)
|
||||
if err := merr.CheckRPCCall(resp, err); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// fill all field id as legacy default behavior
|
||||
collection.LoadFields = lo.FilterMap(resp.GetSchema().GetFields(), func(fieldSchema *schemapb.FieldSchema, _ int) (int64, bool) {
|
||||
// load fields list excludes system fields
|
||||
return fieldSchema.GetFieldID(), !common.IsSystemField(fieldSchema.GetFieldID())
|
||||
})
|
||||
|
||||
// put updated meta back to store
|
||||
err = m.putCollection(true, &Collection{
|
||||
CollectionLoadInfo: collection,
|
||||
LoadPercentage: 100,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// upgradeRecover recovers from old version <= 2.2.x for compatibility.
|
||||
func (m *CollectionManager) upgradeRecover(broker Broker) error {
|
||||
// for loaded collection from 2.2, it only save a old version CollectionLoadInfo without LoadType.
|
||||
|
|
|
@ -26,14 +26,18 @@ import (
|
|||
"github.com/stretchr/testify/suite"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
||||
"github.com/milvus-io/milvus/internal/metastore"
|
||||
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/kv"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/etcd"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
)
|
||||
|
||||
|
@ -494,6 +498,17 @@ func (suite *CollectionManagerSuite) TestUpgradeRecover() {
|
|||
if suite.loadTypes[i] == querypb.LoadType_LoadCollection {
|
||||
suite.broker.EXPECT().GetPartitions(mock.Anything, collection).Return(suite.partitions[collection], nil)
|
||||
}
|
||||
suite.broker.EXPECT().DescribeCollection(mock.Anything, collection).Return(&milvuspb.DescribeCollectionResponse{
|
||||
Status: merr.Success(),
|
||||
Schema: &schemapb.CollectionSchema{
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{FieldID: common.RowIDField},
|
||||
{FieldID: common.TimeStampField},
|
||||
{FieldID: 100, Name: "pk"},
|
||||
{FieldID: 101, Name: "vector"},
|
||||
},
|
||||
},
|
||||
}, nil).Maybe()
|
||||
}
|
||||
|
||||
// do recovery
|
||||
|
@ -508,6 +523,131 @@ func (suite *CollectionManagerSuite) TestUpgradeRecover() {
|
|||
}
|
||||
}
|
||||
|
||||
func (suite *CollectionManagerSuite) TestUpgradeLoadFields() {
|
||||
suite.releaseAll()
|
||||
mgr := suite.mgr
|
||||
|
||||
// put old version of collections and partitions
|
||||
for i, collection := range suite.collections {
|
||||
mgr.PutCollection(&Collection{
|
||||
CollectionLoadInfo: &querypb.CollectionLoadInfo{
|
||||
CollectionID: collection,
|
||||
ReplicaNumber: suite.replicaNumber[i],
|
||||
Status: querypb.LoadStatus_Loaded,
|
||||
LoadType: suite.loadTypes[i],
|
||||
LoadFields: nil, // use nil Load fields, mocking old load info
|
||||
},
|
||||
LoadPercentage: 100,
|
||||
CreatedAt: time.Now(),
|
||||
})
|
||||
for j, partition := range suite.partitions[collection] {
|
||||
mgr.PutPartition(&Partition{
|
||||
PartitionLoadInfo: &querypb.PartitionLoadInfo{
|
||||
CollectionID: collection,
|
||||
PartitionID: partition,
|
||||
Status: querypb.LoadStatus_Loaded,
|
||||
},
|
||||
LoadPercentage: suite.parLoadPercent[collection][j],
|
||||
CreatedAt: time.Now(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// set expectations
|
||||
for _, collection := range suite.collections {
|
||||
suite.broker.EXPECT().DescribeCollection(mock.Anything, collection).Return(&milvuspb.DescribeCollectionResponse{
|
||||
Status: merr.Success(),
|
||||
Schema: &schemapb.CollectionSchema{
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{FieldID: common.RowIDField},
|
||||
{FieldID: common.TimeStampField},
|
||||
{FieldID: 100, Name: "pk"},
|
||||
{FieldID: 101, Name: "vector"},
|
||||
},
|
||||
},
|
||||
}, nil)
|
||||
}
|
||||
|
||||
// do recovery
|
||||
suite.clearMemory()
|
||||
err := mgr.Recover(suite.broker)
|
||||
suite.NoError(err)
|
||||
suite.checkLoadResult()
|
||||
|
||||
for _, collection := range suite.collections {
|
||||
newColl := mgr.GetCollection(collection)
|
||||
suite.ElementsMatch([]int64{100, 101}, newColl.GetLoadFields())
|
||||
}
|
||||
}
|
||||
|
||||
func (suite *CollectionManagerSuite) TestUpgradeLoadFieldsFail() {
|
||||
suite.Run("normal_error", func() {
|
||||
suite.releaseAll()
|
||||
mgr := suite.mgr
|
||||
|
||||
mgr.PutCollection(&Collection{
|
||||
CollectionLoadInfo: &querypb.CollectionLoadInfo{
|
||||
CollectionID: 100,
|
||||
ReplicaNumber: 1,
|
||||
Status: querypb.LoadStatus_Loaded,
|
||||
LoadType: querypb.LoadType_LoadCollection,
|
||||
LoadFields: nil, // use nil Load fields, mocking old load info
|
||||
},
|
||||
LoadPercentage: 100,
|
||||
CreatedAt: time.Now(),
|
||||
})
|
||||
mgr.PutPartition(&Partition{
|
||||
PartitionLoadInfo: &querypb.PartitionLoadInfo{
|
||||
CollectionID: 100,
|
||||
PartitionID: 1000,
|
||||
Status: querypb.LoadStatus_Loaded,
|
||||
},
|
||||
LoadPercentage: 100,
|
||||
CreatedAt: time.Now(),
|
||||
})
|
||||
|
||||
suite.broker.EXPECT().DescribeCollection(mock.Anything, int64(100)).Return(nil, merr.WrapErrServiceInternal("mocked")).Once()
|
||||
// do recovery
|
||||
suite.clearMemory()
|
||||
err := mgr.Recover(suite.broker)
|
||||
suite.Error(err)
|
||||
})
|
||||
|
||||
suite.Run("normal_error", func() {
|
||||
suite.releaseAll()
|
||||
mgr := suite.mgr
|
||||
|
||||
mgr.PutCollection(&Collection{
|
||||
CollectionLoadInfo: &querypb.CollectionLoadInfo{
|
||||
CollectionID: 100,
|
||||
ReplicaNumber: 1,
|
||||
Status: querypb.LoadStatus_Loaded,
|
||||
LoadType: querypb.LoadType_LoadCollection,
|
||||
LoadFields: nil, // use nil Load fields, mocking old load info
|
||||
},
|
||||
LoadPercentage: 100,
|
||||
CreatedAt: time.Now(),
|
||||
})
|
||||
mgr.PutPartition(&Partition{
|
||||
PartitionLoadInfo: &querypb.PartitionLoadInfo{
|
||||
CollectionID: 100,
|
||||
PartitionID: 1000,
|
||||
Status: querypb.LoadStatus_Loaded,
|
||||
},
|
||||
LoadPercentage: 100,
|
||||
CreatedAt: time.Now(),
|
||||
})
|
||||
|
||||
suite.broker.EXPECT().DescribeCollection(mock.Anything, int64(100)).Return(&milvuspb.DescribeCollectionResponse{
|
||||
Status: merr.Status(merr.WrapErrCollectionNotFound(100)),
|
||||
}, nil).Once()
|
||||
// do recovery
|
||||
suite.clearMemory()
|
||||
err := mgr.Recover(suite.broker)
|
||||
suite.NoError(err)
|
||||
})
|
||||
}
|
||||
|
||||
func (suite *CollectionManagerSuite) loadAll() {
|
||||
mgr := suite.mgr
|
||||
|
||||
|
@ -523,6 +663,7 @@ func (suite *CollectionManagerSuite) loadAll() {
|
|||
ReplicaNumber: suite.replicaNumber[i],
|
||||
Status: status,
|
||||
LoadType: suite.loadTypes[i],
|
||||
LoadFields: []int64{100, 101},
|
||||
},
|
||||
LoadPercentage: suite.colLoadPercent[i],
|
||||
CreatedAt: time.Now(),
|
||||
|
|
|
@ -439,6 +439,7 @@ func (suite *ServerSuite) loadAll() {
|
|||
CollectionID: collection,
|
||||
ReplicaNumber: suite.replicaNumber[collection],
|
||||
ResourceGroups: []string{meta.DefaultResourceGroupName},
|
||||
LoadFields: []int64{100, 101},
|
||||
}
|
||||
resp, err := suite.server.LoadCollection(ctx, req)
|
||||
suite.NoError(err)
|
||||
|
@ -449,6 +450,7 @@ func (suite *ServerSuite) loadAll() {
|
|||
PartitionIDs: suite.partitions[collection],
|
||||
ReplicaNumber: suite.replicaNumber[collection],
|
||||
ResourceGroups: []string{meta.DefaultResourceGroupName},
|
||||
LoadFields: []int64{100, 101},
|
||||
}
|
||||
resp, err := suite.server.LoadPartitions(ctx, req)
|
||||
suite.NoError(err)
|
||||
|
|
Loading…
Reference in New Issue