fix: [2.4] Check resource when loading deltalogs (#37195) (#37263)

Cherry pick from master
pr: #37195
Related to #36887

`LoadDeltaLogs` API did not check memory usage. When system is under
high delete load pressure, this could result into OOM quit.

This PR add resource check for `LoadDeltaLogs` actions and separate
internal deltalog loading function with public one.

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/37301/head
congqixia 2024-10-30 11:54:41 +08:00 committed by GitHub
parent ce7fbb9439
commit a2a51c489e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 22 additions and 2 deletions

View File

@ -735,7 +735,7 @@ func (loader *segmentLoader) Load(ctx context.Context,
}
}
}
if err = loader.LoadDeltaLogs(ctx, segment, loadInfo.GetDeltalogs()); err != nil {
if err = loader.loadDeltalogs(ctx, segment, loadInfo.GetDeltalogs()); err != nil {
return errors.Wrap(err, "At LoadDeltaLogs")
}
@ -1369,7 +1369,9 @@ func (loader *segmentLoader) loadBloomFilter(ctx context.Context, segmentID int6
return nil
}
func (loader *segmentLoader) LoadDeltaLogs(ctx context.Context, segment Segment, deltaLogs []*datapb.FieldBinlog) error {
// loadDeltalogs performs the internal actions of `LoadDeltaLogs`
// this function does not perform resource check and is meant be used among other load APIs.
func (loader *segmentLoader) loadDeltalogs(ctx context.Context, segment Segment, deltaLogs []*datapb.FieldBinlog) error {
ctx, sp := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, fmt.Sprintf("LoadDeltalogs-%d", segment.ID()))
defer sp.End()
log := log.Ctx(ctx).With(
@ -1429,6 +1431,24 @@ func (loader *segmentLoader) LoadDeltaLogs(ctx context.Context, segment Segment,
return nil
}
// LoadDeltaLogs load deltalog and write delta data into provided segment.
// it also executes resource protection logic in case of OOM.
func (loader *segmentLoader) LoadDeltaLogs(ctx context.Context, segment Segment, deltaLogs []*datapb.FieldBinlog) error {
loadInfo := &querypb.SegmentLoadInfo{
SegmentID: segment.ID(),
CollectionID: segment.Collection(),
Deltalogs: deltaLogs,
}
// Check memory & storage limit
requestResourceResult, err := loader.requestResource(ctx, loadInfo)
if err != nil {
log.Warn("request resource failed", zap.Error(err))
return err
}
defer loader.freeRequest(requestResourceResult.Resource)
return loader.loadDeltalogs(ctx, segment, deltaLogs)
}
func (loader *segmentLoader) patchEntryNumber(ctx context.Context, segment *LocalSegment, loadInfo *querypb.SegmentLoadInfo) error {
var needReset bool