mirror of https://github.com/milvus-io/milvus.git
enhance: Handle PutOrRef collection schema failure error (#39310)
Related to previous pr #39279 When NewCollection returns nil, the error shall be returned and handled by caller Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/39253/merge
parent
57e5652f1a
commit
bca2a62b78
|
@ -1518,9 +1518,10 @@ func (s *DelegatorDataSuite) TestLevel0Deletions() {
|
||||||
s.Require().NoError(err)
|
s.Require().NoError(err)
|
||||||
|
|
||||||
schema := mock_segcore.GenTestCollectionSchema("test_stop", schemapb.DataType_Int64, true)
|
schema := mock_segcore.GenTestCollectionSchema("test_stop", schemapb.DataType_Int64, true)
|
||||||
collection := segments.NewCollection(1, schema, nil, &querypb.LoadMetaInfo{
|
collection, err := segments.NewCollection(1, schema, nil, &querypb.LoadMetaInfo{
|
||||||
LoadType: querypb.LoadType_LoadCollection,
|
LoadType: querypb.LoadType_LoadCollection,
|
||||||
})
|
})
|
||||||
|
s.NoError(err)
|
||||||
|
|
||||||
l0, _ := segments.NewL0Segment(collection, segments.SegmentTypeSealed, 1, &querypb.SegmentLoadInfo{
|
l0, _ := segments.NewL0Segment(collection, segments.SegmentTypeSealed, 1, &querypb.SegmentLoadInfo{
|
||||||
CollectionID: 1,
|
CollectionID: 1,
|
||||||
|
|
|
@ -96,9 +96,10 @@ func (suite *LocalWorkerTestSuite) BeforeTest(suiteName, testName string) {
|
||||||
|
|
||||||
suite.schema = mock_segcore.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64, true)
|
suite.schema = mock_segcore.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64, true)
|
||||||
suite.indexMeta = mock_segcore.GenTestIndexMeta(suite.collectionID, suite.schema)
|
suite.indexMeta = mock_segcore.GenTestIndexMeta(suite.collectionID, suite.schema)
|
||||||
collection := segments.NewCollection(suite.collectionID, suite.schema, suite.indexMeta, &querypb.LoadMetaInfo{
|
collection, err := segments.NewCollection(suite.collectionID, suite.schema, suite.indexMeta, &querypb.LoadMetaInfo{
|
||||||
LoadType: querypb.LoadType_LoadCollection,
|
LoadType: querypb.LoadType_LoadCollection,
|
||||||
})
|
})
|
||||||
|
suite.NoError(err)
|
||||||
loadMata := &querypb.LoadMetaInfo{
|
loadMata := &querypb.LoadMetaInfo{
|
||||||
LoadType: querypb.LoadType_LoadCollection,
|
LoadType: querypb.LoadType_LoadCollection,
|
||||||
CollectionID: suite.collectionID,
|
CollectionID: suite.collectionID,
|
||||||
|
|
|
@ -62,9 +62,10 @@ func (suite *InsertNodeSuite) TestBasic() {
|
||||||
schema := mock_segcore.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64, true)
|
schema := mock_segcore.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64, true)
|
||||||
in := suite.buildInsertNodeMsg(schema)
|
in := suite.buildInsertNodeMsg(schema)
|
||||||
|
|
||||||
collection := segments.NewCollection(suite.collectionID, schema, mock_segcore.GenTestIndexMeta(suite.collectionID, schema), &querypb.LoadMetaInfo{
|
collection, err := segments.NewCollection(suite.collectionID, schema, mock_segcore.GenTestIndexMeta(suite.collectionID, schema), &querypb.LoadMetaInfo{
|
||||||
LoadType: querypb.LoadType_LoadCollection,
|
LoadType: querypb.LoadType_LoadCollection,
|
||||||
})
|
})
|
||||||
|
suite.NoError(err)
|
||||||
collection.AddPartition(suite.partitionID)
|
collection.AddPartition(suite.partitionID)
|
||||||
|
|
||||||
// init mock
|
// init mock
|
||||||
|
@ -98,9 +99,10 @@ func (suite *InsertNodeSuite) TestDataTypeNotSupported() {
|
||||||
schema := mock_segcore.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64, true)
|
schema := mock_segcore.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64, true)
|
||||||
in := suite.buildInsertNodeMsg(schema)
|
in := suite.buildInsertNodeMsg(schema)
|
||||||
|
|
||||||
collection := segments.NewCollection(suite.collectionID, schema, mock_segcore.GenTestIndexMeta(suite.collectionID, schema), &querypb.LoadMetaInfo{
|
collection, err := segments.NewCollection(suite.collectionID, schema, mock_segcore.GenTestIndexMeta(suite.collectionID, schema), &querypb.LoadMetaInfo{
|
||||||
LoadType: querypb.LoadType_LoadCollection,
|
LoadType: querypb.LoadType_LoadCollection,
|
||||||
})
|
})
|
||||||
|
suite.NoError(err)
|
||||||
collection.AddPartition(suite.partitionID)
|
collection.AddPartition(suite.partitionID)
|
||||||
|
|
||||||
// init mock
|
// init mock
|
||||||
|
|
|
@ -102,7 +102,7 @@ func (suite *PipelineTestSuite) TestBasic() {
|
||||||
// init mock
|
// init mock
|
||||||
// mock collection manager
|
// mock collection manager
|
||||||
schema := mock_segcore.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64, true)
|
schema := mock_segcore.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64, true)
|
||||||
collection := segments.NewCollection(suite.collectionID, schema, mock_segcore.GenTestIndexMeta(suite.collectionID, schema), &querypb.LoadMetaInfo{
|
collection, err := segments.NewCollection(suite.collectionID, schema, mock_segcore.GenTestIndexMeta(suite.collectionID, schema), &querypb.LoadMetaInfo{
|
||||||
LoadType: querypb.LoadType_LoadCollection,
|
LoadType: querypb.LoadType_LoadCollection,
|
||||||
DbProperties: []*commonpb.KeyValuePair{
|
DbProperties: []*commonpb.KeyValuePair{
|
||||||
{
|
{
|
||||||
|
@ -111,6 +111,7 @@ func (suite *PipelineTestSuite) TestBasic() {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
suite.Require().NoError(err)
|
||||||
suite.collectionManager.EXPECT().Get(suite.collectionID).Return(collection)
|
suite.collectionManager.EXPECT().Get(suite.collectionID).Return(collection)
|
||||||
|
|
||||||
// mock mq factory
|
// mock mq factory
|
||||||
|
|
|
@ -41,7 +41,7 @@ type CollectionManager interface {
|
||||||
List() []int64
|
List() []int64
|
||||||
ListWithName() map[int64]string
|
ListWithName() map[int64]string
|
||||||
Get(collectionID int64) *Collection
|
Get(collectionID int64) *Collection
|
||||||
PutOrRef(collectionID int64, schema *schemapb.CollectionSchema, meta *segcorepb.CollectionIndexMeta, loadMeta *querypb.LoadMetaInfo)
|
PutOrRef(collectionID int64, schema *schemapb.CollectionSchema, meta *segcorepb.CollectionIndexMeta, loadMeta *querypb.LoadMetaInfo) error
|
||||||
Ref(collectionID int64, count uint32) bool
|
Ref(collectionID int64, count uint32) bool
|
||||||
// unref the collection,
|
// unref the collection,
|
||||||
// returns true if the collection ref count goes 0, or the collection not exists,
|
// returns true if the collection ref count goes 0, or the collection not exists,
|
||||||
|
@ -84,7 +84,7 @@ func (m *collectionManager) Get(collectionID int64) *Collection {
|
||||||
return m.collections[collectionID]
|
return m.collections[collectionID]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *collectionManager) PutOrRef(collectionID int64, schema *schemapb.CollectionSchema, meta *segcorepb.CollectionIndexMeta, loadMeta *querypb.LoadMetaInfo) {
|
func (m *collectionManager) PutOrRef(collectionID int64, schema *schemapb.CollectionSchema, meta *segcorepb.CollectionIndexMeta, loadMeta *querypb.LoadMetaInfo) error {
|
||||||
m.mut.Lock()
|
m.mut.Lock()
|
||||||
defer m.mut.Unlock()
|
defer m.mut.Unlock()
|
||||||
|
|
||||||
|
@ -92,14 +92,18 @@ func (m *collectionManager) PutOrRef(collectionID int64, schema *schemapb.Collec
|
||||||
// the schema may be changed even the collection is loaded
|
// the schema may be changed even the collection is loaded
|
||||||
collection.schema.Store(schema)
|
collection.schema.Store(schema)
|
||||||
collection.Ref(1)
|
collection.Ref(1)
|
||||||
return
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info("put new collection", zap.Int64("collectionID", collectionID), zap.Any("schema", schema))
|
log.Info("put new collection", zap.Int64("collectionID", collectionID), zap.Any("schema", schema))
|
||||||
collection := NewCollection(collectionID, schema, meta, loadMeta)
|
collection, err := NewCollection(collectionID, schema, meta, loadMeta)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
collection.Ref(1)
|
collection.Ref(1)
|
||||||
m.collections[collectionID] = collection
|
m.collections[collectionID] = collection
|
||||||
m.updateMetric()
|
m.updateMetric()
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *collectionManager) updateMetric() {
|
func (m *collectionManager) updateMetric() {
|
||||||
|
@ -245,7 +249,7 @@ func (c *Collection) Unref(count uint32) uint32 {
|
||||||
}
|
}
|
||||||
|
|
||||||
// newCollection returns a new Collection
|
// newCollection returns a new Collection
|
||||||
func NewCollection(collectionID int64, schema *schemapb.CollectionSchema, indexMeta *segcorepb.CollectionIndexMeta, loadMetaInfo *querypb.LoadMetaInfo) *Collection {
|
func NewCollection(collectionID int64, schema *schemapb.CollectionSchema, indexMeta *segcorepb.CollectionIndexMeta, loadMetaInfo *querypb.LoadMetaInfo) (*Collection, error) {
|
||||||
/*
|
/*
|
||||||
CCollection
|
CCollection
|
||||||
NewCollection(const char* schema_proto_blob);
|
NewCollection(const char* schema_proto_blob);
|
||||||
|
@ -281,7 +285,7 @@ func NewCollection(collectionID int64, schema *schemapb.CollectionSchema, indexM
|
||||||
ccollection, err := segcore.CreateCCollection(req)
|
ccollection, err := segcore.CreateCCollection(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("create collection failed", zap.Error(err))
|
log.Warn("create collection failed", zap.Error(err))
|
||||||
return nil
|
return nil, err
|
||||||
}
|
}
|
||||||
coll := &Collection{
|
coll := &Collection{
|
||||||
ccollection: ccollection,
|
ccollection: ccollection,
|
||||||
|
@ -300,7 +304,7 @@ func NewCollection(collectionID int64, schema *schemapb.CollectionSchema, indexM
|
||||||
}
|
}
|
||||||
coll.schema.Store(schema)
|
coll.schema.Store(schema)
|
||||||
|
|
||||||
return coll
|
return coll, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Only for test
|
// Only for test
|
||||||
|
|
|
@ -52,11 +52,13 @@ func (s *ManagerSuite) SetupTest() {
|
||||||
|
|
||||||
for i, id := range s.segmentIDs {
|
for i, id := range s.segmentIDs {
|
||||||
schema := mock_segcore.GenTestCollectionSchema("manager-suite", schemapb.DataType_Int64, true)
|
schema := mock_segcore.GenTestCollectionSchema("manager-suite", schemapb.DataType_Int64, true)
|
||||||
|
collection, err := NewCollection(s.collectionIDs[i], schema, mock_segcore.GenTestIndexMeta(s.collectionIDs[i], schema), &querypb.LoadMetaInfo{
|
||||||
|
LoadType: querypb.LoadType_LoadCollection,
|
||||||
|
})
|
||||||
|
s.Require().NoError(err)
|
||||||
segment, err := NewSegment(
|
segment, err := NewSegment(
|
||||||
context.Background(),
|
context.Background(),
|
||||||
NewCollection(s.collectionIDs[i], schema, mock_segcore.GenTestIndexMeta(s.collectionIDs[i], schema), &querypb.LoadMetaInfo{
|
collection,
|
||||||
LoadType: querypb.LoadType_LoadCollection,
|
|
||||||
}),
|
|
||||||
s.types[i],
|
s.types[i],
|
||||||
0,
|
0,
|
||||||
&querypb.SegmentLoadInfo{
|
&querypb.SegmentLoadInfo{
|
||||||
|
|
|
@ -166,8 +166,21 @@ func (_c *MockCollectionManager_ListWithName_Call) RunAndReturn(run func() map[i
|
||||||
}
|
}
|
||||||
|
|
||||||
// PutOrRef provides a mock function with given fields: collectionID, schema, meta, loadMeta
|
// PutOrRef provides a mock function with given fields: collectionID, schema, meta, loadMeta
|
||||||
func (_m *MockCollectionManager) PutOrRef(collectionID int64, schema *schemapb.CollectionSchema, meta *segcorepb.CollectionIndexMeta, loadMeta *querypb.LoadMetaInfo) {
|
func (_m *MockCollectionManager) PutOrRef(collectionID int64, schema *schemapb.CollectionSchema, meta *segcorepb.CollectionIndexMeta, loadMeta *querypb.LoadMetaInfo) error {
|
||||||
_m.Called(collectionID, schema, meta, loadMeta)
|
ret := _m.Called(collectionID, schema, meta, loadMeta)
|
||||||
|
|
||||||
|
if len(ret) == 0 {
|
||||||
|
panic("no return value specified for PutOrRef")
|
||||||
|
}
|
||||||
|
|
||||||
|
var r0 error
|
||||||
|
if rf, ok := ret.Get(0).(func(int64, *schemapb.CollectionSchema, *segcorepb.CollectionIndexMeta, *querypb.LoadMetaInfo) error); ok {
|
||||||
|
r0 = rf(collectionID, schema, meta, loadMeta)
|
||||||
|
} else {
|
||||||
|
r0 = ret.Error(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
return r0
|
||||||
}
|
}
|
||||||
|
|
||||||
// MockCollectionManager_PutOrRef_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'PutOrRef'
|
// MockCollectionManager_PutOrRef_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'PutOrRef'
|
||||||
|
@ -191,12 +204,12 @@ func (_c *MockCollectionManager_PutOrRef_Call) Run(run func(collectionID int64,
|
||||||
return _c
|
return _c
|
||||||
}
|
}
|
||||||
|
|
||||||
func (_c *MockCollectionManager_PutOrRef_Call) Return() *MockCollectionManager_PutOrRef_Call {
|
func (_c *MockCollectionManager_PutOrRef_Call) Return(_a0 error) *MockCollectionManager_PutOrRef_Call {
|
||||||
_c.Call.Return()
|
_c.Call.Return(_a0)
|
||||||
return _c
|
return _c
|
||||||
}
|
}
|
||||||
|
|
||||||
func (_c *MockCollectionManager_PutOrRef_Call) RunAndReturn(run func(int64, *schemapb.CollectionSchema, *segcorepb.CollectionIndexMeta, *querypb.LoadMetaInfo)) *MockCollectionManager_PutOrRef_Call {
|
func (_c *MockCollectionManager_PutOrRef_Call) RunAndReturn(run func(int64, *schemapb.CollectionSchema, *segcorepb.CollectionIndexMeta, *querypb.LoadMetaInfo) error) *MockCollectionManager_PutOrRef_Call {
|
||||||
_c.Call.Return(run)
|
_c.Call.Return(run)
|
||||||
return _c
|
return _c
|
||||||
}
|
}
|
||||||
|
|
|
@ -811,7 +811,8 @@ func (suite *SegmentLoaderDetailSuite) SetupTest() {
|
||||||
PartitionIDs: []int64{suite.partitionID},
|
PartitionIDs: []int64{suite.partitionID},
|
||||||
}
|
}
|
||||||
|
|
||||||
collection := NewCollection(suite.collectionID, schema, indexMeta, loadMeta)
|
collection, err := NewCollection(suite.collectionID, schema, indexMeta, loadMeta)
|
||||||
|
suite.Require().NoError(err)
|
||||||
suite.collectionManager.EXPECT().Get(suite.collectionID).Return(collection).Maybe()
|
suite.collectionManager.EXPECT().Get(suite.collectionID).Return(collection).Maybe()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -221,9 +221,10 @@ func (suite *QueryNodeSuite) TestStop() {
|
||||||
suite.node.manager = segments.NewManager()
|
suite.node.manager = segments.NewManager()
|
||||||
|
|
||||||
schema := mock_segcore.GenTestCollectionSchema("test_stop", schemapb.DataType_Int64, true)
|
schema := mock_segcore.GenTestCollectionSchema("test_stop", schemapb.DataType_Int64, true)
|
||||||
collection := segments.NewCollection(1, schema, nil, &querypb.LoadMetaInfo{
|
collection, err := segments.NewCollection(1, schema, nil, &querypb.LoadMetaInfo{
|
||||||
LoadType: querypb.LoadType_LoadCollection,
|
LoadType: querypb.LoadType_LoadCollection,
|
||||||
})
|
})
|
||||||
|
suite.Require().NoError(err)
|
||||||
segment, err := segments.NewSegment(
|
segment, err := segments.NewSegment(
|
||||||
context.Background(),
|
context.Background(),
|
||||||
collection,
|
collection,
|
||||||
|
|
|
@ -238,8 +238,12 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm
|
||||||
return merr.Success(), nil
|
return merr.Success(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
node.manager.Collection.PutOrRef(req.GetCollectionID(), req.GetSchema(),
|
err := node.manager.Collection.PutOrRef(req.GetCollectionID(), req.GetSchema(),
|
||||||
node.composeIndexMeta(ctx, req.GetIndexInfoList(), req.Schema), req.GetLoadMeta())
|
node.composeIndexMeta(ctx, req.GetIndexInfoList(), req.Schema), req.GetLoadMeta())
|
||||||
|
if err != nil {
|
||||||
|
log.Warn("failed to ref collection", zap.Error(err))
|
||||||
|
return merr.Status(err), nil
|
||||||
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
if !merr.Ok(status) {
|
if !merr.Ok(status) {
|
||||||
node.manager.Collection.Unref(req.GetCollectionID(), 1)
|
node.manager.Collection.Unref(req.GetCollectionID(), 1)
|
||||||
|
@ -474,8 +478,12 @@ func (node *QueryNode) LoadSegments(ctx context.Context, req *querypb.LoadSegmen
|
||||||
return merr.Success(), nil
|
return merr.Success(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
node.manager.Collection.PutOrRef(req.GetCollectionID(), req.GetSchema(),
|
err := node.manager.Collection.PutOrRef(req.GetCollectionID(), req.GetSchema(),
|
||||||
node.composeIndexMeta(ctx, req.GetIndexInfoList(), req.GetSchema()), req.GetLoadMeta())
|
node.composeIndexMeta(ctx, req.GetIndexInfoList(), req.GetSchema()), req.GetLoadMeta())
|
||||||
|
if err != nil {
|
||||||
|
log.Warn("failed to ref collection", zap.Error(err))
|
||||||
|
return merr.Status(err), nil
|
||||||
|
}
|
||||||
defer node.manager.Collection.Unref(req.GetCollectionID(), 1)
|
defer node.manager.Collection.Unref(req.GetCollectionID(), 1)
|
||||||
|
|
||||||
if req.GetLoadScope() == querypb.LoadScope_Delta {
|
if req.GetLoadScope() == querypb.LoadScope_Delta {
|
||||||
|
|
|
@ -380,6 +380,69 @@ func (suite *ServiceSuite) TestWatchDmChannelsVarchar() {
|
||||||
suite.Equal(commonpb.ErrorCode_Success, status.ErrorCode)
|
suite.Equal(commonpb.ErrorCode_Success, status.ErrorCode)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (suite *ServiceSuite) TestWatchDmChannels_BadIndexMeta() {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
// data
|
||||||
|
schema := mock_segcore.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64, false)
|
||||||
|
deltaLogs, err := mock_segcore.SaveDeltaLog(suite.collectionID,
|
||||||
|
suite.partitionIDs[0],
|
||||||
|
suite.flushedSegmentIDs[0],
|
||||||
|
suite.node.chunkManager,
|
||||||
|
)
|
||||||
|
suite.NoError(err)
|
||||||
|
|
||||||
|
req := &querypb.WatchDmChannelsRequest{
|
||||||
|
Base: &commonpb.MsgBase{
|
||||||
|
MsgType: commonpb.MsgType_WatchDmChannels,
|
||||||
|
MsgID: rand.Int63(),
|
||||||
|
TargetID: suite.node.session.ServerID,
|
||||||
|
},
|
||||||
|
NodeID: suite.node.session.ServerID,
|
||||||
|
CollectionID: suite.collectionID,
|
||||||
|
PartitionIDs: suite.partitionIDs,
|
||||||
|
Infos: []*datapb.VchannelInfo{
|
||||||
|
{
|
||||||
|
CollectionID: suite.collectionID,
|
||||||
|
ChannelName: suite.vchannel,
|
||||||
|
SeekPosition: suite.position,
|
||||||
|
FlushedSegmentIds: suite.flushedSegmentIDs,
|
||||||
|
DroppedSegmentIds: suite.droppedSegmentIDs,
|
||||||
|
LevelZeroSegmentIds: suite.levelZeroSegmentIDs,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
SegmentInfos: map[int64]*datapb.SegmentInfo{
|
||||||
|
suite.levelZeroSegmentIDs[0]: {
|
||||||
|
ID: suite.levelZeroSegmentIDs[0],
|
||||||
|
CollectionID: suite.collectionID,
|
||||||
|
PartitionID: suite.partitionIDs[0],
|
||||||
|
InsertChannel: suite.vchannel,
|
||||||
|
Deltalogs: deltaLogs,
|
||||||
|
Level: datapb.SegmentLevel_L0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Schema: schema,
|
||||||
|
LoadMeta: &querypb.LoadMetaInfo{
|
||||||
|
LoadType: querypb.LoadType_LoadCollection,
|
||||||
|
CollectionID: suite.collectionID,
|
||||||
|
PartitionIDs: suite.partitionIDs,
|
||||||
|
MetricType: defaultMetricType,
|
||||||
|
},
|
||||||
|
IndexInfoList: []*indexpb.IndexInfo{{
|
||||||
|
IndexName: "bad_index",
|
||||||
|
FieldID: 100,
|
||||||
|
TypeParams: []*commonpb.KeyValuePair{
|
||||||
|
{Key: "dup_key", Value: "val"},
|
||||||
|
{Key: "dup_key", Value: "val"},
|
||||||
|
},
|
||||||
|
}},
|
||||||
|
}
|
||||||
|
|
||||||
|
// watchDmChannels
|
||||||
|
status, err := suite.node.WatchDmChannels(ctx, req)
|
||||||
|
suite.Error(merr.CheckRPCCall(status, err))
|
||||||
|
}
|
||||||
|
|
||||||
func (suite *ServiceSuite) TestWatchDmChannels_Failed() {
|
func (suite *ServiceSuite) TestWatchDmChannels_Failed() {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
|
@ -658,6 +721,49 @@ func (suite *ServiceSuite) TestLoadSegments_VarChar() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (suite *ServiceSuite) TestLoadSegments_BadIndexMeta() {
|
||||||
|
ctx := context.Background()
|
||||||
|
suite.TestWatchDmChannelsVarchar()
|
||||||
|
// data
|
||||||
|
schema := mock_segcore.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_VarChar, false)
|
||||||
|
loadMeta := &querypb.LoadMetaInfo{
|
||||||
|
LoadType: querypb.LoadType_LoadCollection,
|
||||||
|
CollectionID: suite.collectionID,
|
||||||
|
PartitionIDs: suite.partitionIDs,
|
||||||
|
}
|
||||||
|
suite.node.manager.Collection = segments.NewCollectionManager()
|
||||||
|
// suite.node.manager.Collection.PutOrRef(suite.collectionID, schema, nil, loadMeta)
|
||||||
|
|
||||||
|
infos := suite.genSegmentLoadInfos(schema, nil)
|
||||||
|
for _, info := range infos {
|
||||||
|
req := &querypb.LoadSegmentsRequest{
|
||||||
|
Base: &commonpb.MsgBase{
|
||||||
|
MsgID: rand.Int63(),
|
||||||
|
TargetID: suite.node.session.ServerID,
|
||||||
|
},
|
||||||
|
CollectionID: suite.collectionID,
|
||||||
|
DstNodeID: suite.node.session.ServerID,
|
||||||
|
Infos: []*querypb.SegmentLoadInfo{info},
|
||||||
|
Schema: schema,
|
||||||
|
DeltaPositions: []*msgpb.MsgPosition{{Timestamp: 20000}},
|
||||||
|
NeedTransfer: true,
|
||||||
|
LoadMeta: loadMeta,
|
||||||
|
IndexInfoList: []*indexpb.IndexInfo{{
|
||||||
|
IndexName: "bad_index",
|
||||||
|
FieldID: 100,
|
||||||
|
TypeParams: []*commonpb.KeyValuePair{
|
||||||
|
{Key: "dup_key", Value: "val"},
|
||||||
|
{Key: "dup_key", Value: "val"},
|
||||||
|
},
|
||||||
|
}},
|
||||||
|
}
|
||||||
|
|
||||||
|
// LoadSegment
|
||||||
|
status, err := suite.node.LoadSegments(ctx, req)
|
||||||
|
suite.Error(merr.CheckRPCCall(status, err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (suite *ServiceSuite) TestLoadDeltaInt64() {
|
func (suite *ServiceSuite) TestLoadDeltaInt64() {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
suite.TestLoadSegments_Int64()
|
suite.TestLoadSegments_Int64()
|
||||||
|
|
Loading…
Reference in New Issue