mirror of https://github.com/milvus-io/milvus.git
Cherry-pick from master pr: #28601 See also #28022 #28034 The load segment may reaches before watch dml channel, so the index meta may be empty as well Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/28726/head
parent
288844e3cf
commit
e4ea148c0f
|
@ -427,6 +427,12 @@ func (node *QueryNode) LoadSegments(ctx context.Context, req *querypb.LoadSegmen
|
|||
return merr.Status(err), nil
|
||||
}
|
||||
|
||||
// check index
|
||||
if len(req.GetIndexInfoList()) == 0 {
|
||||
err := merr.WrapErrIndexNotFoundForCollection(req.GetSchema().GetName())
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
|
||||
// Delegates request to workers
|
||||
if req.GetNeedTransfer() {
|
||||
delegator, ok := node.delegators.Get(segment.GetInsertChannel())
|
||||
|
|
|
@ -550,6 +550,9 @@ func (suite *ServiceSuite) TestLoadSegments_Int64() {
|
|||
Schema: schema,
|
||||
DeltaPositions: []*msgpb.MsgPosition{{Timestamp: 20000}},
|
||||
NeedTransfer: true,
|
||||
IndexInfoList: []*indexpb.IndexInfo{
|
||||
{},
|
||||
},
|
||||
}
|
||||
|
||||
// LoadSegment
|
||||
|
@ -586,6 +589,7 @@ func (suite *ServiceSuite) TestLoadSegments_VarChar() {
|
|||
DeltaPositions: []*msgpb.MsgPosition{{Timestamp: 20000}},
|
||||
NeedTransfer: true,
|
||||
LoadMeta: loadMeta,
|
||||
IndexInfoList: []*indexpb.IndexInfo{{}},
|
||||
}
|
||||
|
||||
// LoadSegment
|
||||
|
@ -605,12 +609,13 @@ func (suite *ServiceSuite) TestLoadDeltaInt64() {
|
|||
MsgID: rand.Int63(),
|
||||
TargetID: suite.node.session.ServerID,
|
||||
},
|
||||
CollectionID: suite.collectionID,
|
||||
DstNodeID: suite.node.session.ServerID,
|
||||
Infos: suite.genSegmentLoadInfos(schema),
|
||||
Schema: schema,
|
||||
NeedTransfer: true,
|
||||
LoadScope: querypb.LoadScope_Delta,
|
||||
CollectionID: suite.collectionID,
|
||||
DstNodeID: suite.node.session.ServerID,
|
||||
Infos: suite.genSegmentLoadInfos(schema),
|
||||
Schema: schema,
|
||||
NeedTransfer: true,
|
||||
LoadScope: querypb.LoadScope_Delta,
|
||||
IndexInfoList: []*indexpb.IndexInfo{{}},
|
||||
}
|
||||
|
||||
// LoadSegment
|
||||
|
@ -629,12 +634,13 @@ func (suite *ServiceSuite) TestLoadDeltaVarchar() {
|
|||
MsgID: rand.Int63(),
|
||||
TargetID: suite.node.session.ServerID,
|
||||
},
|
||||
CollectionID: suite.collectionID,
|
||||
DstNodeID: suite.node.session.ServerID,
|
||||
Infos: suite.genSegmentLoadInfos(schema),
|
||||
Schema: schema,
|
||||
NeedTransfer: true,
|
||||
LoadScope: querypb.LoadScope_Delta,
|
||||
CollectionID: suite.collectionID,
|
||||
DstNodeID: suite.node.session.ServerID,
|
||||
Infos: suite.genSegmentLoadInfos(schema),
|
||||
Schema: schema,
|
||||
NeedTransfer: true,
|
||||
LoadScope: querypb.LoadScope_Delta,
|
||||
IndexInfoList: []*indexpb.IndexInfo{{}},
|
||||
}
|
||||
|
||||
// LoadSegment
|
||||
|
@ -664,12 +670,13 @@ func (suite *ServiceSuite) TestLoadIndex_Success() {
|
|||
MsgID: rand.Int63(),
|
||||
TargetID: suite.node.session.ServerID,
|
||||
},
|
||||
CollectionID: suite.collectionID,
|
||||
DstNodeID: suite.node.session.ServerID,
|
||||
Infos: rawInfo,
|
||||
Schema: schema,
|
||||
NeedTransfer: false,
|
||||
LoadScope: querypb.LoadScope_Full,
|
||||
CollectionID: suite.collectionID,
|
||||
DstNodeID: suite.node.session.ServerID,
|
||||
Infos: rawInfo,
|
||||
Schema: schema,
|
||||
NeedTransfer: false,
|
||||
LoadScope: querypb.LoadScope_Full,
|
||||
IndexInfoList: []*indexpb.IndexInfo{{}},
|
||||
}
|
||||
|
||||
// Load segment
|
||||
|
@ -688,12 +695,13 @@ func (suite *ServiceSuite) TestLoadIndex_Success() {
|
|||
MsgID: rand.Int63(),
|
||||
TargetID: suite.node.session.ServerID,
|
||||
},
|
||||
CollectionID: suite.collectionID,
|
||||
DstNodeID: suite.node.session.ServerID,
|
||||
Infos: infos,
|
||||
Schema: schema,
|
||||
NeedTransfer: false,
|
||||
LoadScope: querypb.LoadScope_Index,
|
||||
CollectionID: suite.collectionID,
|
||||
DstNodeID: suite.node.session.ServerID,
|
||||
Infos: infos,
|
||||
Schema: schema,
|
||||
NeedTransfer: false,
|
||||
LoadScope: querypb.LoadScope_Index,
|
||||
IndexInfoList: []*indexpb.IndexInfo{{}},
|
||||
}
|
||||
|
||||
// Load segment
|
||||
|
@ -732,12 +740,13 @@ func (suite *ServiceSuite) TestLoadIndex_Failed() {
|
|||
MsgID: rand.Int63(),
|
||||
TargetID: suite.node.session.ServerID,
|
||||
},
|
||||
CollectionID: suite.collectionID,
|
||||
DstNodeID: suite.node.session.ServerID,
|
||||
Infos: rawInfo,
|
||||
Schema: schema,
|
||||
NeedTransfer: false,
|
||||
LoadScope: querypb.LoadScope_Index,
|
||||
CollectionID: suite.collectionID,
|
||||
DstNodeID: suite.node.session.ServerID,
|
||||
Infos: rawInfo,
|
||||
Schema: schema,
|
||||
NeedTransfer: false,
|
||||
LoadScope: querypb.LoadScope_Index,
|
||||
IndexInfoList: []*indexpb.IndexInfo{{}},
|
||||
}
|
||||
|
||||
// Load segment
|
||||
|
@ -764,12 +773,13 @@ func (suite *ServiceSuite) TestLoadIndex_Failed() {
|
|||
MsgID: rand.Int63(),
|
||||
TargetID: suite.node.session.ServerID,
|
||||
},
|
||||
CollectionID: suite.collectionID,
|
||||
DstNodeID: suite.node.session.ServerID,
|
||||
Infos: infos,
|
||||
Schema: schema,
|
||||
NeedTransfer: false,
|
||||
LoadScope: querypb.LoadScope_Index,
|
||||
CollectionID: suite.collectionID,
|
||||
DstNodeID: suite.node.session.ServerID,
|
||||
Infos: infos,
|
||||
Schema: schema,
|
||||
NeedTransfer: false,
|
||||
LoadScope: querypb.LoadScope_Index,
|
||||
IndexInfoList: []*indexpb.IndexInfo{{}},
|
||||
}
|
||||
|
||||
// Load segment
|
||||
|
@ -793,6 +803,9 @@ func (suite *ServiceSuite) TestLoadSegments_Failed() {
|
|||
Infos: suite.genSegmentLoadInfos(schema),
|
||||
Schema: schema,
|
||||
NeedTransfer: true,
|
||||
IndexInfoList: []*indexpb.IndexInfo{
|
||||
{},
|
||||
},
|
||||
}
|
||||
|
||||
// Delegator not found
|
||||
|
@ -800,10 +813,18 @@ func (suite *ServiceSuite) TestLoadSegments_Failed() {
|
|||
suite.NoError(err)
|
||||
suite.ErrorIs(merr.Error(status), merr.ErrChannelNotFound)
|
||||
|
||||
// IndexIndex not found
|
||||
nonIndexReq := typeutil.Clone(req)
|
||||
nonIndexReq.IndexInfoList = nil
|
||||
status, err = suite.node.LoadSegments(ctx, nonIndexReq)
|
||||
suite.NoError(err)
|
||||
suite.ErrorIs(merr.Error(status), merr.ErrIndexNotFound)
|
||||
|
||||
// target not match
|
||||
req.Base.TargetID = -1
|
||||
status, err = suite.node.LoadSegments(ctx, req)
|
||||
suite.NoError(err)
|
||||
suite.T().Log(merr.Error(status))
|
||||
suite.ErrorIs(merr.Error(status), merr.ErrNodeNotMatch)
|
||||
|
||||
// node not healthy
|
||||
|
@ -829,11 +850,12 @@ func (suite *ServiceSuite) TestLoadSegments_Transfer() {
|
|||
MsgID: rand.Int63(),
|
||||
TargetID: suite.node.session.ServerID,
|
||||
},
|
||||
CollectionID: suite.collectionID,
|
||||
DstNodeID: suite.node.session.ServerID,
|
||||
Infos: suite.genSegmentLoadInfos(schema),
|
||||
Schema: schema,
|
||||
NeedTransfer: true,
|
||||
CollectionID: suite.collectionID,
|
||||
DstNodeID: suite.node.session.ServerID,
|
||||
Infos: suite.genSegmentLoadInfos(schema),
|
||||
Schema: schema,
|
||||
NeedTransfer: true,
|
||||
IndexInfoList: []*indexpb.IndexInfo{{}},
|
||||
}
|
||||
|
||||
// LoadSegment
|
||||
|
@ -850,11 +872,12 @@ func (suite *ServiceSuite) TestLoadSegments_Transfer() {
|
|||
MsgID: rand.Int63(),
|
||||
TargetID: suite.node.session.ServerID,
|
||||
},
|
||||
CollectionID: suite.collectionID,
|
||||
DstNodeID: suite.node.session.ServerID,
|
||||
Infos: suite.genSegmentLoadInfos(schema),
|
||||
Schema: schema,
|
||||
NeedTransfer: true,
|
||||
CollectionID: suite.collectionID,
|
||||
DstNodeID: suite.node.session.ServerID,
|
||||
Infos: suite.genSegmentLoadInfos(schema),
|
||||
Schema: schema,
|
||||
NeedTransfer: true,
|
||||
IndexInfoList: []*indexpb.IndexInfo{{}},
|
||||
}
|
||||
|
||||
// LoadSegment
|
||||
|
@ -876,14 +899,14 @@ func (suite *ServiceSuite) TestLoadSegments_Transfer() {
|
|||
MsgID: rand.Int63(),
|
||||
TargetID: suite.node.session.ServerID,
|
||||
},
|
||||
CollectionID: suite.collectionID,
|
||||
DstNodeID: suite.node.session.ServerID,
|
||||
Infos: suite.genSegmentLoadInfos(schema),
|
||||
Schema: schema,
|
||||
NeedTransfer: true,
|
||||
CollectionID: suite.collectionID,
|
||||
DstNodeID: suite.node.session.ServerID,
|
||||
Infos: suite.genSegmentLoadInfos(schema),
|
||||
Schema: schema,
|
||||
NeedTransfer: true,
|
||||
IndexInfoList: []*indexpb.IndexInfo{{}},
|
||||
}
|
||||
|
||||
// LoadSegment
|
||||
// LoadSegment
|
||||
status, err := suite.node.LoadSegments(ctx, req)
|
||||
suite.NoError(err)
|
||||
|
|
Loading…
Reference in New Issue