fix: fix mmap load (#31171)

issue: https://github.com/milvus-io/milvus/issues/31101

---------

Signed-off-by: sunby <sunbingyi1992@gmail.com>
pull/31179/head
Bingyi Sun 2024-03-13 10:49:04 +08:00 committed by GitHub
parent 87b3c25b15
commit 7b7187b465
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 44 additions and 31 deletions

View File

@ -121,38 +121,43 @@ func NewManager() *Manager {
segMgr := NewSegmentManager()
sf := singleflight.Group{}
return &Manager{
manager := &Manager{
Collection: NewCollectionManager(),
Segment: segMgr,
DiskCache: cache.NewLRUCache[int64, Segment](
int32(cacheMaxItemNum),
func(key int64) (Segment, bool) {
log.Debug("cache missed segment", zap.Int64("segmentID", key))
segMgr.mu.RLock()
defer segMgr.mu.RUnlock()
segment, ok := segMgr.sealedSegments[key]
if !ok {
// the segment has been released, just ignore it
return nil, false
}
info := segment.LoadInfo()
_, err, _ := sf.Do(fmt.Sprint(segment.ID()), func() (interface{}, error) {
err := loadSealedSegmentFields(context.Background(), segment.(*LocalSegment), info.BinlogPaths, info.GetNumOfRows(), WithLoadStatus(LoadStatusMapped))
return nil, err
})
if err != nil {
log.Warn("cache sealed segment failed", zap.Error(err))
return nil, false
}
return segment, true
},
func(key int64, segment Segment) {
log.Debug("evict segment from cache", zap.Int64("segmentID", key))
segment.Release(WithReleaseScope(ReleaseScopeData))
}),
}
manager.DiskCache = cache.NewLRUCache[int64, Segment](
int32(cacheMaxItemNum),
func(key int64) (Segment, bool) {
log.Debug("cache missed segment", zap.Int64("segmentID", key))
segMgr.mu.RLock()
defer segMgr.mu.RUnlock()
segment, ok := segMgr.sealedSegments[key]
if !ok {
// the segment has been released, just ignore it
return nil, false
}
info := segment.LoadInfo()
_, err, _ := sf.Do(fmt.Sprint(segment.ID()), func() (interface{}, error) {
collection := manager.Collection.Get(segment.Collection())
if collection == nil {
return nil, merr.WrapErrCollectionNotLoaded(segment.Collection(), "failed to load segment fields")
}
err := loadSealedSegmentFields(context.Background(), collection, segment.(*LocalSegment), info.BinlogPaths, info.GetNumOfRows(), WithLoadStatus(LoadStatusMapped))
return nil, err
})
if err != nil {
log.Warn("cache sealed segment failed", zap.Error(err))
return nil, false
}
return segment, true
},
func(key int64, segment Segment) {
log.Debug("evict segment from cache", zap.Int64("segmentID", key))
segment.Release(WithReleaseScope(ReleaseScopeData))
})
return manager
}
type SegmentManager interface {

View File

@ -1008,7 +1008,7 @@ func (loader *segmentLoader) loadSegment(ctx context.Context,
}
}
}
if err := loadSealedSegmentFields(ctx, segment, fieldBinlogs, loadInfo.GetNumOfRows(), WithLoadStatus(loadStatus)); err != nil {
if err := loadSealedSegmentFields(ctx, collection, segment, fieldBinlogs, loadInfo.GetNumOfRows(), WithLoadStatus(loadStatus)); err != nil {
return err
}
// https://github.com/milvus-io/milvus/23654
@ -1064,11 +1064,19 @@ func (loader *segmentLoader) filterPKStatsBinlogs(fieldBinlogs []*datapb.FieldBi
return result, storage.DefaultStatsType
}
func loadSealedSegmentFields(ctx context.Context, segment *LocalSegment, fields []*datapb.FieldBinlog, rowCount int64, opts ...loadOption) error {
func loadSealedSegmentFields(ctx context.Context, collection *Collection, segment *LocalSegment, fields []*datapb.FieldBinlog, rowCount int64, opts ...loadOption) error {
options := newLoadOptions()
for _, opt := range opts {
opt(options)
}
runningGroup, _ := errgroup.WithContext(ctx)
for _, field := range fields {
fieldBinLog := field
fieldID := field.FieldID
mmapEnabled := common.IsFieldMmapEnabled(collection.Schema(), fieldID)
if mmapEnabled && options.LoadStatus == LoadStatusInMemory {
opts = append(opts, WithLoadStatus(LoadStatusMapped))
}
runningGroup.Go(func() error {
return segment.LoadFieldData(ctx,
fieldID,