Make querynode LoadSegments interface idempotent (#17109)

This PR makes the following changes:
* separate LoadSegmentsTask into two phases: PreExecute and Execute
* filters out segments that are already loaded in PreExecute phase

Signed-off-by: Letian Jiang <letian.jiang@zilliz.com>
pull/17120/head
Letian Jiang 2022-05-20 09:15:57 +08:00 committed by GitHub
parent 599763d9bf
commit 70825a35cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 58 additions and 10 deletions

View File

@ -633,14 +633,8 @@ func (l *loadSegmentsTask) OnEnqueue() error {
}
func (l *loadSegmentsTask) PreExecute(ctx context.Context) error {
return nil
}
func (l *loadSegmentsTask) Execute(ctx context.Context) error {
// TODO: support db
log.Info("LoadSegment start", zap.Int64("msgID", l.req.Base.MsgID))
log.Info("LoadSegmentTask PreExecute start", zap.Int64("msgID", l.req.Base.MsgID))
var err error
// init meta
collectionID := l.req.GetCollectionID()
l.node.historical.replica.addCollection(collectionID, l.req.GetSchema())
@ -656,13 +650,29 @@ func (l *loadSegmentsTask) Execute(ctx context.Context) error {
}
}
err = l.node.loader.loadSegment(l.req, segmentTypeSealed)
// filter segments that are already loaded in this querynode
var filteredInfos []*queryPb.SegmentLoadInfo
for _, info := range l.req.Infos {
if !l.node.historical.replica.hasSegment(info.SegmentID) {
filteredInfos = append(filteredInfos, info)
} else {
log.Debug("ignore segment that is already loaded", zap.Int64("segmentID", info.SegmentID))
}
}
l.req.Infos = filteredInfos
log.Info("LoadSegmentTask PreExecute done", zap.Int64("msgID", l.req.Base.MsgID))
return nil
}
func (l *loadSegmentsTask) Execute(ctx context.Context) error {
// TODO: support db
log.Info("LoadSegmentTask Execute start", zap.Int64("msgID", l.req.Base.MsgID))
err := l.node.loader.loadSegment(l.req, segmentTypeSealed)
if err != nil {
log.Warn(err.Error())
return err
}
log.Info("LoadSegments done", zap.Int64("msgID", l.req.Base.MsgID))
log.Info("LoadSegmentTask Execute done", zap.Int64("msgID", l.req.Base.MsgID))
return nil
}

View File

@ -574,6 +574,44 @@ func TestTask_loadSegmentsTask(t *testing.T) {
assert.NoError(t, err)
})
t.Run("test repeated load", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)
fieldBinlog, err := saveBinLog(ctx, defaultCollectionID, defaultPartitionID, defaultSegmentID, defaultMsgLength, schema)
assert.NoError(t, err)
req := &querypb.LoadSegmentsRequest{
Base: genCommonMsgBase(commonpb.MsgType_LoadSegments),
Schema: schema,
Infos: []*querypb.SegmentLoadInfo{
{
SegmentID: defaultSegmentID,
PartitionID: defaultPartitionID,
CollectionID: defaultCollectionID,
BinlogPaths: fieldBinlog,
},
},
}
task := loadSegmentsTask{
req: req,
node: node,
}
// execute loadSegmentsTask twice
err = task.PreExecute(ctx)
assert.NoError(t, err)
err = task.Execute(ctx)
assert.NoError(t, err)
err = task.PreExecute(ctx)
assert.NoError(t, err)
err = task.Execute(ctx)
assert.NoError(t, err)
// expected only one segment in replica
num := node.historical.replica.getSegmentNum()
assert.Equal(t, 1, num)
})
t.Run("test execute grpc error", func(t *testing.T) {
node, err := genSimpleQueryNode(ctx)
assert.NoError(t, err)