mirror of https://github.com/milvus-io/milvus.git
fix: Add IndexList check for load segment request (#28601)
See also #28022 #28034 The load segment may reaches before watch dml channel, so the index meta may be empty as well --------- Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/28702/head
parent
a2fe9dad49
commit
098c1c926d
|
@ -446,6 +446,12 @@ func (node *QueryNode) LoadSegments(ctx context.Context, req *querypb.LoadSegmen
|
||||||
return merr.Status(err), nil
|
return merr.Status(err), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// check index
|
||||||
|
if len(req.GetIndexInfoList()) == 0 {
|
||||||
|
err := merr.WrapErrIndexNotFoundForCollection(req.GetSchema().GetName())
|
||||||
|
return merr.Status(err), nil
|
||||||
|
}
|
||||||
|
|
||||||
// Delegates request to workers
|
// Delegates request to workers
|
||||||
if req.GetNeedTransfer() {
|
if req.GetNeedTransfer() {
|
||||||
delegator, ok := node.delegators.Get(segment.GetInsertChannel())
|
delegator, ok := node.delegators.Get(segment.GetInsertChannel())
|
||||||
|
|
|
@ -567,6 +567,9 @@ func (suite *ServiceSuite) TestLoadSegments_Int64() {
|
||||||
Schema: schema,
|
Schema: schema,
|
||||||
DeltaPositions: []*msgpb.MsgPosition{{Timestamp: 20000}},
|
DeltaPositions: []*msgpb.MsgPosition{{Timestamp: 20000}},
|
||||||
NeedTransfer: true,
|
NeedTransfer: true,
|
||||||
|
IndexInfoList: []*indexpb.IndexInfo{
|
||||||
|
{},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
// LoadSegment
|
// LoadSegment
|
||||||
|
@ -603,6 +606,7 @@ func (suite *ServiceSuite) TestLoadSegments_VarChar() {
|
||||||
DeltaPositions: []*msgpb.MsgPosition{{Timestamp: 20000}},
|
DeltaPositions: []*msgpb.MsgPosition{{Timestamp: 20000}},
|
||||||
NeedTransfer: true,
|
NeedTransfer: true,
|
||||||
LoadMeta: loadMeta,
|
LoadMeta: loadMeta,
|
||||||
|
IndexInfoList: []*indexpb.IndexInfo{{}},
|
||||||
}
|
}
|
||||||
|
|
||||||
// LoadSegment
|
// LoadSegment
|
||||||
|
@ -622,12 +626,13 @@ func (suite *ServiceSuite) TestLoadDeltaInt64() {
|
||||||
MsgID: rand.Int63(),
|
MsgID: rand.Int63(),
|
||||||
TargetID: suite.node.session.ServerID,
|
TargetID: suite.node.session.ServerID,
|
||||||
},
|
},
|
||||||
CollectionID: suite.collectionID,
|
CollectionID: suite.collectionID,
|
||||||
DstNodeID: suite.node.session.ServerID,
|
DstNodeID: suite.node.session.ServerID,
|
||||||
Infos: suite.genSegmentLoadInfos(schema),
|
Infos: suite.genSegmentLoadInfos(schema),
|
||||||
Schema: schema,
|
Schema: schema,
|
||||||
NeedTransfer: true,
|
NeedTransfer: true,
|
||||||
LoadScope: querypb.LoadScope_Delta,
|
LoadScope: querypb.LoadScope_Delta,
|
||||||
|
IndexInfoList: []*indexpb.IndexInfo{{}},
|
||||||
}
|
}
|
||||||
|
|
||||||
// LoadSegment
|
// LoadSegment
|
||||||
|
@ -646,12 +651,13 @@ func (suite *ServiceSuite) TestLoadDeltaVarchar() {
|
||||||
MsgID: rand.Int63(),
|
MsgID: rand.Int63(),
|
||||||
TargetID: suite.node.session.ServerID,
|
TargetID: suite.node.session.ServerID,
|
||||||
},
|
},
|
||||||
CollectionID: suite.collectionID,
|
CollectionID: suite.collectionID,
|
||||||
DstNodeID: suite.node.session.ServerID,
|
DstNodeID: suite.node.session.ServerID,
|
||||||
Infos: suite.genSegmentLoadInfos(schema),
|
Infos: suite.genSegmentLoadInfos(schema),
|
||||||
Schema: schema,
|
Schema: schema,
|
||||||
NeedTransfer: true,
|
NeedTransfer: true,
|
||||||
LoadScope: querypb.LoadScope_Delta,
|
LoadScope: querypb.LoadScope_Delta,
|
||||||
|
IndexInfoList: []*indexpb.IndexInfo{{}},
|
||||||
}
|
}
|
||||||
|
|
||||||
// LoadSegment
|
// LoadSegment
|
||||||
|
@ -681,12 +687,13 @@ func (suite *ServiceSuite) TestLoadIndex_Success() {
|
||||||
MsgID: rand.Int63(),
|
MsgID: rand.Int63(),
|
||||||
TargetID: suite.node.session.ServerID,
|
TargetID: suite.node.session.ServerID,
|
||||||
},
|
},
|
||||||
CollectionID: suite.collectionID,
|
CollectionID: suite.collectionID,
|
||||||
DstNodeID: suite.node.session.ServerID,
|
DstNodeID: suite.node.session.ServerID,
|
||||||
Infos: rawInfo,
|
Infos: rawInfo,
|
||||||
Schema: schema,
|
Schema: schema,
|
||||||
NeedTransfer: false,
|
NeedTransfer: false,
|
||||||
LoadScope: querypb.LoadScope_Full,
|
LoadScope: querypb.LoadScope_Full,
|
||||||
|
IndexInfoList: []*indexpb.IndexInfo{{}},
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load segment
|
// Load segment
|
||||||
|
@ -705,12 +712,13 @@ func (suite *ServiceSuite) TestLoadIndex_Success() {
|
||||||
MsgID: rand.Int63(),
|
MsgID: rand.Int63(),
|
||||||
TargetID: suite.node.session.ServerID,
|
TargetID: suite.node.session.ServerID,
|
||||||
},
|
},
|
||||||
CollectionID: suite.collectionID,
|
CollectionID: suite.collectionID,
|
||||||
DstNodeID: suite.node.session.ServerID,
|
DstNodeID: suite.node.session.ServerID,
|
||||||
Infos: infos,
|
Infos: infos,
|
||||||
Schema: schema,
|
Schema: schema,
|
||||||
NeedTransfer: false,
|
NeedTransfer: false,
|
||||||
LoadScope: querypb.LoadScope_Index,
|
LoadScope: querypb.LoadScope_Index,
|
||||||
|
IndexInfoList: []*indexpb.IndexInfo{{}},
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load segment
|
// Load segment
|
||||||
|
@ -749,12 +757,13 @@ func (suite *ServiceSuite) TestLoadIndex_Failed() {
|
||||||
MsgID: rand.Int63(),
|
MsgID: rand.Int63(),
|
||||||
TargetID: suite.node.session.ServerID,
|
TargetID: suite.node.session.ServerID,
|
||||||
},
|
},
|
||||||
CollectionID: suite.collectionID,
|
CollectionID: suite.collectionID,
|
||||||
DstNodeID: suite.node.session.ServerID,
|
DstNodeID: suite.node.session.ServerID,
|
||||||
Infos: rawInfo,
|
Infos: rawInfo,
|
||||||
Schema: schema,
|
Schema: schema,
|
||||||
NeedTransfer: false,
|
NeedTransfer: false,
|
||||||
LoadScope: querypb.LoadScope_Index,
|
LoadScope: querypb.LoadScope_Index,
|
||||||
|
IndexInfoList: []*indexpb.IndexInfo{{}},
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load segment
|
// Load segment
|
||||||
|
@ -781,12 +790,13 @@ func (suite *ServiceSuite) TestLoadIndex_Failed() {
|
||||||
MsgID: rand.Int63(),
|
MsgID: rand.Int63(),
|
||||||
TargetID: suite.node.session.ServerID,
|
TargetID: suite.node.session.ServerID,
|
||||||
},
|
},
|
||||||
CollectionID: suite.collectionID,
|
CollectionID: suite.collectionID,
|
||||||
DstNodeID: suite.node.session.ServerID,
|
DstNodeID: suite.node.session.ServerID,
|
||||||
Infos: infos,
|
Infos: infos,
|
||||||
Schema: schema,
|
Schema: schema,
|
||||||
NeedTransfer: false,
|
NeedTransfer: false,
|
||||||
LoadScope: querypb.LoadScope_Index,
|
LoadScope: querypb.LoadScope_Index,
|
||||||
|
IndexInfoList: []*indexpb.IndexInfo{{}},
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load segment
|
// Load segment
|
||||||
|
@ -810,6 +820,9 @@ func (suite *ServiceSuite) TestLoadSegments_Failed() {
|
||||||
Infos: suite.genSegmentLoadInfos(schema),
|
Infos: suite.genSegmentLoadInfos(schema),
|
||||||
Schema: schema,
|
Schema: schema,
|
||||||
NeedTransfer: true,
|
NeedTransfer: true,
|
||||||
|
IndexInfoList: []*indexpb.IndexInfo{
|
||||||
|
{},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delegator not found
|
// Delegator not found
|
||||||
|
@ -817,10 +830,18 @@ func (suite *ServiceSuite) TestLoadSegments_Failed() {
|
||||||
suite.NoError(err)
|
suite.NoError(err)
|
||||||
suite.ErrorIs(merr.Error(status), merr.ErrChannelNotFound)
|
suite.ErrorIs(merr.Error(status), merr.ErrChannelNotFound)
|
||||||
|
|
||||||
|
// IndexIndex not found
|
||||||
|
nonIndexReq := typeutil.Clone(req)
|
||||||
|
nonIndexReq.IndexInfoList = nil
|
||||||
|
status, err = suite.node.LoadSegments(ctx, nonIndexReq)
|
||||||
|
suite.NoError(err)
|
||||||
|
suite.ErrorIs(merr.Error(status), merr.ErrIndexNotFound)
|
||||||
|
|
||||||
// target not match
|
// target not match
|
||||||
req.Base.TargetID = -1
|
req.Base.TargetID = -1
|
||||||
status, err = suite.node.LoadSegments(ctx, req)
|
status, err = suite.node.LoadSegments(ctx, req)
|
||||||
suite.NoError(err)
|
suite.NoError(err)
|
||||||
|
suite.T().Log(merr.Error(status))
|
||||||
suite.ErrorIs(merr.Error(status), merr.ErrNodeNotMatch)
|
suite.ErrorIs(merr.Error(status), merr.ErrNodeNotMatch)
|
||||||
|
|
||||||
// node not healthy
|
// node not healthy
|
||||||
|
@ -846,11 +867,12 @@ func (suite *ServiceSuite) TestLoadSegments_Transfer() {
|
||||||
MsgID: rand.Int63(),
|
MsgID: rand.Int63(),
|
||||||
TargetID: suite.node.session.ServerID,
|
TargetID: suite.node.session.ServerID,
|
||||||
},
|
},
|
||||||
CollectionID: suite.collectionID,
|
CollectionID: suite.collectionID,
|
||||||
DstNodeID: suite.node.session.ServerID,
|
DstNodeID: suite.node.session.ServerID,
|
||||||
Infos: suite.genSegmentLoadInfos(schema),
|
Infos: suite.genSegmentLoadInfos(schema),
|
||||||
Schema: schema,
|
Schema: schema,
|
||||||
NeedTransfer: true,
|
NeedTransfer: true,
|
||||||
|
IndexInfoList: []*indexpb.IndexInfo{{}},
|
||||||
}
|
}
|
||||||
|
|
||||||
// LoadSegment
|
// LoadSegment
|
||||||
|
@ -867,11 +889,12 @@ func (suite *ServiceSuite) TestLoadSegments_Transfer() {
|
||||||
MsgID: rand.Int63(),
|
MsgID: rand.Int63(),
|
||||||
TargetID: suite.node.session.ServerID,
|
TargetID: suite.node.session.ServerID,
|
||||||
},
|
},
|
||||||
CollectionID: suite.collectionID,
|
CollectionID: suite.collectionID,
|
||||||
DstNodeID: suite.node.session.ServerID,
|
DstNodeID: suite.node.session.ServerID,
|
||||||
Infos: suite.genSegmentLoadInfos(schema),
|
Infos: suite.genSegmentLoadInfos(schema),
|
||||||
Schema: schema,
|
Schema: schema,
|
||||||
NeedTransfer: true,
|
NeedTransfer: true,
|
||||||
|
IndexInfoList: []*indexpb.IndexInfo{{}},
|
||||||
}
|
}
|
||||||
|
|
||||||
// LoadSegment
|
// LoadSegment
|
||||||
|
@ -893,14 +916,14 @@ func (suite *ServiceSuite) TestLoadSegments_Transfer() {
|
||||||
MsgID: rand.Int63(),
|
MsgID: rand.Int63(),
|
||||||
TargetID: suite.node.session.ServerID,
|
TargetID: suite.node.session.ServerID,
|
||||||
},
|
},
|
||||||
CollectionID: suite.collectionID,
|
CollectionID: suite.collectionID,
|
||||||
DstNodeID: suite.node.session.ServerID,
|
DstNodeID: suite.node.session.ServerID,
|
||||||
Infos: suite.genSegmentLoadInfos(schema),
|
Infos: suite.genSegmentLoadInfos(schema),
|
||||||
Schema: schema,
|
Schema: schema,
|
||||||
NeedTransfer: true,
|
NeedTransfer: true,
|
||||||
|
IndexInfoList: []*indexpb.IndexInfo{{}},
|
||||||
}
|
}
|
||||||
|
|
||||||
// LoadSegment
|
|
||||||
// LoadSegment
|
// LoadSegment
|
||||||
status, err := suite.node.LoadSegments(ctx, req)
|
status, err := suite.node.LoadSegments(ctx, req)
|
||||||
suite.NoError(err)
|
suite.NoError(err)
|
||||||
|
|
Loading…
Reference in New Issue