mirror of https://github.com/milvus-io/milvus.git
Return error if setSegment failed when loadSegment (#8075)
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/8182/head
parent
a6ab41e48b
commit
338e32362b
|
@ -83,14 +83,15 @@ func (loader *segmentLoader) loadSegment(req *querypb.LoadSegmentsRequest, onSer
|
|||
deleteSegment(s)
|
||||
}
|
||||
}
|
||||
setSegments := func() {
|
||||
setSegments := func() error {
|
||||
for _, s := range newSegments {
|
||||
err := loader.historicalReplica.setSegment(s)
|
||||
if err != nil {
|
||||
log.Warn(err.Error())
|
||||
deleteSegment(s)
|
||||
segmentGC()
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// start to load
|
||||
|
@ -142,9 +143,8 @@ func (loader *segmentLoader) loadSegment(req *querypb.LoadSegmentsRequest, onSer
|
|||
}
|
||||
newSegments = append(newSegments, segment)
|
||||
}
|
||||
setSegments()
|
||||
|
||||
return nil
|
||||
return setSegments()
|
||||
}
|
||||
|
||||
func (loader *segmentLoader) loadSegmentInternal(collectionID UniqueID, segment *Segment, segmentLoadInfo *querypb.SegmentLoadInfo) error {
|
||||
|
|
|
@ -31,12 +31,6 @@ func TestSegmentLoader_loadSegment(t *testing.T) {
|
|||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
historical, err := genSimpleHistorical(ctx)
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = historical.replica.removeSegment(defaultSegmentID)
|
||||
assert.NoError(t, err)
|
||||
|
||||
kv, err := genEtcdKV()
|
||||
assert.NoError(t, err)
|
||||
|
||||
|
@ -46,6 +40,11 @@ func TestSegmentLoader_loadSegment(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
|
||||
t.Run("test no segment meta", func(t *testing.T) {
|
||||
historical, err := genSimpleHistorical(ctx)
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = historical.replica.removeSegment(defaultSegmentID)
|
||||
assert.NoError(t, err)
|
||||
loader := newSegmentLoader(ctx, nil, nil, historical.replica, kv)
|
||||
assert.NotNil(t, loader)
|
||||
|
||||
|
@ -76,6 +75,11 @@ func TestSegmentLoader_loadSegment(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("test load segment", func(t *testing.T) {
|
||||
historical, err := genSimpleHistorical(ctx)
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = historical.replica.removeSegment(defaultSegmentID)
|
||||
assert.NoError(t, err)
|
||||
loader := newSegmentLoader(ctx, nil, nil, historical.replica, kv)
|
||||
assert.NotNil(t, loader)
|
||||
|
||||
|
@ -106,6 +110,43 @@ func TestSegmentLoader_loadSegment(t *testing.T) {
|
|||
err = loader.loadSegment(req, true)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("test set segment error", func(t *testing.T) {
|
||||
historical, err := genSimpleHistorical(ctx)
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = historical.replica.removePartition(defaultPartitionID)
|
||||
assert.NoError(t, err)
|
||||
loader := newSegmentLoader(ctx, nil, nil, historical.replica, kv)
|
||||
assert.NotNil(t, loader)
|
||||
|
||||
req := &querypb.LoadSegmentsRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_WatchQueryChannels,
|
||||
MsgID: rand.Int63(),
|
||||
},
|
||||
NodeID: 0,
|
||||
Schema: schema,
|
||||
LoadCondition: querypb.TriggerCondition_grpcRequest,
|
||||
Infos: []*querypb.SegmentLoadInfo{
|
||||
{
|
||||
SegmentID: defaultSegmentID,
|
||||
PartitionID: defaultPartitionID,
|
||||
CollectionID: defaultCollectionID,
|
||||
BinlogPaths: fieldBinlog,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
key := fmt.Sprintf("%s/%d", queryCoordSegmentMetaPrefix, defaultSegmentID)
|
||||
segmentInfo := &querypb.SegmentInfo{}
|
||||
value := proto.MarshalTextString(segmentInfo)
|
||||
err = kv.Save(key, value)
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = loader.loadSegment(req, true)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestSegmentLoader_notOnService(t *testing.T) {
|
||||
|
|
Loading…
Reference in New Issue