fix: evicted segments in the serverlss mode(#31959) (#31961)

related: #31959
1. reset segment index status after evicting to lazyload=true
2. reset num_rows to null_opt

Signed-off-by: MrPresent-Han <chun.han@zilliz.com>
pull/32109/head
Chun Han 2024-04-10 15:15:19 +08:00 committed by GitHub
parent c4806b69c4
commit f3f2a5a7e9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 63 additions and 17 deletions

View File

@ -1129,7 +1129,7 @@ SegmentSealedImpl::ClearData() {
index_ready_bitset_.reset();
binlog_index_bitset_.reset();
system_ready_count_ = 0;
num_rows_ = 0;
num_rows_ = std::nullopt;
scalar_indexings_.clear();
vector_indexings_.clear();
insert_record_.clear();

View File

@ -993,6 +993,39 @@ func (_c *MockSegment_Release_Call) RunAndReturn(run func(...releaseOption)) *Mo
return _c
}
// ResetIndexesLazyLoad provides a mock function with given fields: lazyState
func (_m *MockSegment) ResetIndexesLazyLoad(lazyState bool) {
_m.Called(lazyState)
}
// MockSegment_ResetIndexesLazyLoad_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ResetIndexesLazyLoad'
type MockSegment_ResetIndexesLazyLoad_Call struct {
*mock.Call
}
// ResetIndexesLazyLoad is a helper method to define mock.On call
// - lazyState bool
func (_e *MockSegment_Expecter) ResetIndexesLazyLoad(lazyState interface{}) *MockSegment_ResetIndexesLazyLoad_Call {
return &MockSegment_ResetIndexesLazyLoad_Call{Call: _e.mock.On("ResetIndexesLazyLoad", lazyState)}
}
func (_c *MockSegment_ResetIndexesLazyLoad_Call) Run(run func(lazyState bool)) *MockSegment_ResetIndexesLazyLoad_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(bool))
})
return _c
}
func (_c *MockSegment_ResetIndexesLazyLoad_Call) Return() *MockSegment_ResetIndexesLazyLoad_Call {
_c.Call.Return()
return _c
}
func (_c *MockSegment_ResetIndexesLazyLoad_Call) RunAndReturn(run func(bool)) *MockSegment_ResetIndexesLazyLoad_Call {
_c.Call.Return(run)
return _c
}
// ResourceGroup provides a mock function with given fields:
func (_m *MockSegment) ResourceGroup() string {
ret := _m.Called()

View File

@ -463,6 +463,12 @@ func (s *LocalSegment) Indexes() []*IndexedFieldInfo {
return result
}
func (s *LocalSegment) ResetIndexesLazyLoad(lazyState bool) {
for _, indexInfo := range s.Indexes() {
indexInfo.LazyLoad = lazyState
}
}
func (s *LocalSegment) Search(ctx context.Context, searchReq *SearchRequest) (*SearchResult, error) {
/*
CStatus
@ -1129,6 +1135,14 @@ func (s *LocalSegment) LoadIndex(ctx context.Context, indexInfo *querypb.FieldIn
opt(options)
}
log := log.Ctx(ctx).With(
zap.Int64("collectionID", s.Collection()),
zap.Int64("partitionID", s.Partition()),
zap.Int64("segmentID", s.ID()),
zap.Int64("fieldID", indexInfo.GetFieldID()),
zap.Int64("indexID", indexInfo.GetIndexID()),
)
if options.LoadStatus == LoadStatusMeta {
s.addIndex(indexInfo.GetFieldID(), &IndexedFieldInfo{
FieldBinlog: &datapb.FieldBinlog{
@ -1149,14 +1163,6 @@ func (s *LocalSegment) LoadIndex(ctx context.Context, indexInfo *querypb.FieldIn
ctx, sp := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, fmt.Sprintf("LoadIndex-%d-%d", s.ID(), indexInfo.GetFieldID()))
defer sp.End()
log := log.Ctx(ctx).With(
zap.Int64("collectionID", s.Collection()),
zap.Int64("partitionID", s.Partition()),
zap.Int64("segmentID", s.ID()),
zap.Int64("fieldID", indexInfo.GetFieldID()),
zap.Int64("indexID", indexInfo.GetIndexID()),
)
loadIndexInfo, err := newLoadIndexInfo(ctx)
if err != nil {
return err
@ -1340,11 +1346,20 @@ func (s *LocalSegment) Release(opts ...releaseOption) {
// release will never fail
defer stateLockGuard.Done(nil)
log := log.With(zap.Int64("collectionID", s.Collection()),
zap.Int64("partitionID", s.Partition()),
zap.Int64("segmentID", s.ID()),
zap.String("segmentType", s.segmentType.String()),
zap.Int64("insertCount", s.InsertCount()),
)
// wait all read ops finished
ptr := s.ptr
if options.Scope == ReleaseScopeData {
s.loadStatus.Store(string(LoadStatusMeta))
C.ClearSegmentData(ptr)
s.ResetIndexesLazyLoad(true)
log.Debug("release segment data done and the field indexes info has been set lazy load=true")
return
}
@ -1365,13 +1380,7 @@ func (s *LocalSegment) Release(opts ...releaseOption) {
metrics.QueryNodeDiskUsedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(localDiskUsage) / 1024 / 1024) // in MB
}
log.Info("delete segment from memory",
zap.Int64("collectionID", s.Collection()),
zap.Int64("partitionID", s.Partition()),
zap.Int64("segmentID", s.ID()),
zap.String("segmentType", s.segmentType.String()),
zap.Int64("insertCount", s.InsertCount()),
)
log.Info("delete segment from memory")
}
// StartLoadData starts the loading process of the segment.

View File

@ -97,4 +97,5 @@ type Segment interface {
Search(ctx context.Context, searchReq *SearchRequest) (*SearchResult, error)
Retrieve(ctx context.Context, plan *RetrievePlan) (*segcorepb.RetrieveResults, error)
IsLazyLoad() bool
ResetIndexesLazyLoad(lazyState bool)
}

View File

@ -117,6 +117,9 @@ func (s *L0Segment) Indexes() []*IndexedFieldInfo {
return nil
}
func (s *L0Segment) ResetIndexesLazyLoad(lazyState bool) {
}
func (s *L0Segment) Type() SegmentType {
return s.segmentType
}

View File

@ -978,7 +978,7 @@ func (loader *segmentLoader) loadSealedSegment(ctx context.Context, loadInfo *qu
if err := segment.AddFieldDataInfo(ctx, loadInfo.GetNumOfRows(), loadInfo.GetBinlogPaths()); err != nil {
return err
}
log := log.Ctx(ctx).With(zap.Int64("segmentID", segment.ID()))
tr := timerecord.NewTimeRecorder("segmentLoader.LoadIndex")
log.Info("load fields...",
zap.Int64s("indexedFields", lo.Keys(indexedFieldInfos)),