enhance: Unify LoadStateLock RLock & PinIf (#39206)

Related to #39205

This PR merge `RLock` & `PinIfNotReleased` into `PinIf` function
preventing segment being released before any Read operation finished.

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/38455/head^2
congqixia 2025-01-14 18:38:59 +08:00 committed by GitHub
parent da07993082
commit d89768f9e0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 166 additions and 117 deletions

View File

@ -863,6 +863,7 @@ common:
threshold:
info: 500 # minimum milliseconds for printing durations in info level
warn: 1000 # minimum milliseconds for printing durations in warn level
maxWLockConditionalWaitTime: 600 # maximum seconds for waiting wlock conditional
storage:
scheme: s3
enablev2: false

View File

@ -403,7 +403,7 @@ func (s *LocalSegment) initializeSegment() error {
// Provide ONLY the read lock operations,
// don't make `ptrLock` public to avoid abusing of the mutex.
func (s *LocalSegment) PinIfNotReleased() error {
if !s.ptrLock.PinIfNotReleased() {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
return nil
@ -419,10 +419,10 @@ func (s *LocalSegment) InsertCount() int64 {
func (s *LocalSegment) RowNum() int64 {
// if segment is not loaded, return 0 (maybe not loaded or release by lru)
if !s.ptrLock.RLockIf(state.IsDataLoaded) {
if !s.ptrLock.PinIf(state.IsDataLoaded) {
return 0
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()
rowNum := s.rowNum.Load()
if rowNum < 0 {
@ -436,10 +436,10 @@ func (s *LocalSegment) RowNum() int64 {
}
func (s *LocalSegment) MemSize() int64 {
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return 0
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()
memSize := s.memSize.Load()
if memSize < 0 {
@ -470,10 +470,10 @@ func (s *LocalSegment) ExistIndex(fieldID int64) bool {
}
func (s *LocalSegment) HasRawData(fieldID int64) bool {
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return false
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()
return s.csegment.HasRawData(fieldID)
}
@ -500,11 +500,11 @@ func (s *LocalSegment) Search(ctx context.Context, searchReq *segcore.SearchRequ
zap.String("segmentType", s.segmentType.String()),
)
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
// TODO: check if the segment is readable but not released. too many related logic need to be refactor.
return nil, merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()
hasIndex := s.ExistIndex(searchReq.SearchFieldID())
log = log.With(zap.Bool("withIndex", hasIndex))
@ -522,11 +522,11 @@ func (s *LocalSegment) Search(ctx context.Context, searchReq *segcore.SearchRequ
}
func (s *LocalSegment) retrieve(ctx context.Context, plan *segcore.RetrievePlan, log *zap.Logger) (*segcore.RetrieveResult, error) {
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
// TODO: check if the segment is readable but not released. too many related logic need to be refactor.
return nil, merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()
log.Debug("begin to retrieve")
@ -569,11 +569,11 @@ func (s *LocalSegment) Retrieve(ctx context.Context, plan *segcore.RetrievePlan)
}
func (s *LocalSegment) retrieveByOffsets(ctx context.Context, plan *segcore.RetrievePlanWithOffsets, log *zap.Logger) (*segcore.RetrieveResult, error) {
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
// TODO: check if the segment is readable but not released. too many related logic need to be refactor.
return nil, merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()
log.Debug("begin to retrieve by offsets")
tr := timerecord.NewTimeRecorder("cgoRetrieveByOffsets")
@ -631,10 +631,10 @@ func (s *LocalSegment) Insert(ctx context.Context, rowIDs []int64, timestamps []
if s.Type() != SegmentTypeGrowing {
return fmt.Errorf("unexpected segmentType when segmentInsert, segmentType = %s", s.segmentType.String())
}
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()
var result *segcore.InsertResult
var err error
@ -678,10 +678,10 @@ func (s *LocalSegment) Delete(ctx context.Context, primaryKeys storage.PrimaryKe
if primaryKeys.Len() == 0 {
return nil
}
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()
var err error
GetDynamicPool().Submit(func() (any, error) {
@ -715,10 +715,10 @@ func (s *LocalSegment) LoadMultiFieldData(ctx context.Context) error {
rowCount := loadInfo.GetNumOfRows()
fields := loadInfo.GetBinlogPaths()
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()
log := log.Ctx(ctx).With(
zap.Int64("collectionID", s.Collection()),
@ -759,10 +759,10 @@ func (s *LocalSegment) LoadMultiFieldData(ctx context.Context) error {
}
func (s *LocalSegment) LoadFieldData(ctx context.Context, fieldID int64, rowCount int64, field *datapb.FieldBinlog) error {
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()
ctx, sp := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, fmt.Sprintf("LoadFieldData-%d-%d", s.ID(), fieldID))
defer sp.End()
@ -815,10 +815,10 @@ func (s *LocalSegment) LoadFieldData(ctx context.Context, fieldID int64, rowCoun
}
func (s *LocalSegment) AddFieldDataInfo(ctx context.Context, rowCount int64, fields []*datapb.FieldBinlog) error {
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()
log := log.Ctx(ctx).WithLazy(
zap.Int64("collectionID", s.Collection()),
@ -855,10 +855,10 @@ func (s *LocalSegment) LoadDeltaData(ctx context.Context, deltaData *storage.Del
pks, tss := deltaData.DeletePks(), deltaData.DeleteTimestamps()
rowNum := deltaData.DeleteRowCount()
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()
log := log.Ctx(ctx).With(
zap.Int64("collectionID", s.Collection()),
@ -1100,10 +1100,10 @@ func (s *LocalSegment) UpdateIndexInfo(ctx context.Context, indexInfo *querypb.F
zap.Int64("segmentID", s.ID()),
zap.Int64("fieldID", indexInfo.FieldID),
)
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()
var status C.CStatus
GetDynamicPool().Submit(func() (any, error) {
@ -1138,10 +1138,10 @@ func (s *LocalSegment) WarmupChunkCache(ctx context.Context, fieldID int64, mmap
zap.Int64("fieldID", fieldID),
zap.Bool("mmapEnabled", mmapEnabled),
)
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if !s.ptrLock.PinIf(state.IsNotReleased) {
return
}
defer s.ptrLock.RUnlock()
defer s.ptrLock.Unpin()
var status C.CStatus
@ -1165,10 +1165,10 @@ func (s *LocalSegment) WarmupChunkCache(ctx context.Context, fieldID int64, mmap
// the state transition of segment in segment loader will blocked.
// add a waiter to avoid it.
s.ptrLock.BlockUntilDataLoadedOrReleased()
if !s.ptrLock.RLockIf(state.IsNotReleased) {
if s.PinIfNotReleased() != nil {
return nil, nil
}
defer s.ptrLock.RUnlock()
defer s.Unpin()
cFieldID := C.int64_t(fieldID)
cMmapEnabled := C.bool(mmapEnabled)

View File

@ -3,13 +3,18 @@ package state
import (
"fmt"
"sync"
"time"
"github.com/cockroachdb/errors"
"go.uber.org/atomic"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
type loadStateEnum int
func noop() {}
// LoadState represent the state transition of segment.
// LoadStateOnlyMeta: segment is created with meta, but not loaded.
// LoadStateDataLoading: segment is loading data.
@ -69,32 +74,17 @@ type LoadStateLock struct {
}
// RLockIfNotReleased locks the segment if the state is not released.
func (ls *LoadStateLock) RLockIf(pred StatePredicate) bool {
ls.mu.RLock()
if !pred(ls.state) {
ls.mu.RUnlock()
return false
}
return true
}
// RUnlock unlocks the segment.
func (ls *LoadStateLock) RUnlock() {
ls.mu.RUnlock()
}
// PinIfNotReleased pin the segment into memory, avoid ReleaseAll to release it.
func (ls *LoadStateLock) PinIfNotReleased() bool {
func (ls *LoadStateLock) PinIf(pred StatePredicate) bool {
ls.mu.RLock()
defer ls.mu.RUnlock()
if ls.state == LoadStateReleased {
if !pred(ls.state) {
return false
}
ls.refCnt.Inc()
return true
}
// Unpin unpin the segment, then segment can be released by ReleaseAll.
// Unpin unlocks the segment.
func (ls *LoadStateLock) Unpin() {
ls.mu.RLock()
defer ls.mu.RUnlock()
@ -108,6 +98,12 @@ func (ls *LoadStateLock) Unpin() {
}
}
// PinIfNotReleased pin the segment if the state is not released.
// grammar suger for PinIf(IsNotReleased).
func (ls *LoadStateLock) PinIfNotReleased() bool {
return ls.PinIf(IsNotReleased)
}
// StartLoadData starts load segment data
// Fast fail if segment is not in LoadStateOnlyMeta.
func (ls *LoadStateLock) StartLoadData() (LoadStateLockGuard, error) {
@ -129,76 +125,82 @@ func (ls *LoadStateLock) StartLoadData() (LoadStateLockGuard, error) {
// StartReleaseData wait until the segment is releasable and starts releasing segment data.
func (ls *LoadStateLock) StartReleaseData() (g LoadStateLockGuard) {
ls.cv.L.Lock()
defer ls.cv.L.Unlock()
ls.waitUntilCanReleaseData()
switch ls.state {
case LoadStateDataLoaded:
ls.state = LoadStateDataReleasing
ls.cv.Broadcast()
return newLoadStateLockGuard(ls, LoadStateDataLoaded, LoadStateOnlyMeta)
case LoadStateOnlyMeta:
// already transit to target state, do nothing.
return nil
case LoadStateReleased:
// do nothing for empty segment.
return nil
default:
panic(fmt.Sprintf("unreachable code: invalid state when releasing data, %s", ls.state.String()))
}
ls.waitOrPanic(ls.canReleaseData, func() {
switch ls.state {
case LoadStateDataLoaded:
ls.state = LoadStateDataReleasing
ls.cv.Broadcast()
g = newLoadStateLockGuard(ls, LoadStateDataLoaded, LoadStateOnlyMeta)
case LoadStateOnlyMeta:
// already transit to target state, do nothing.
g = nil
case LoadStateReleased:
// do nothing for empty segment.
g = nil
default:
panic(fmt.Sprintf("unreachable code: invalid state when releasing data, %s", ls.state.String()))
}
})
return g
}
// StartReleaseAll wait until the segment is releasable and starts releasing all segment.
func (ls *LoadStateLock) StartReleaseAll() (g LoadStateLockGuard) {
ls.cv.L.Lock()
defer ls.cv.L.Unlock()
ls.waitOrPanic(ls.canReleaseAll, func() {
switch ls.state {
case LoadStateDataLoaded:
ls.state = LoadStateReleased
ls.cv.Broadcast()
g = newNopLoadStateLockGuard()
case LoadStateOnlyMeta:
ls.state = LoadStateReleased
ls.cv.Broadcast()
g = newNopLoadStateLockGuard()
case LoadStateReleased:
// already transit to target state, do nothing.
g = nil
default:
panic(fmt.Sprintf("unreachable code: invalid state when releasing data, %s", ls.state.String()))
}
})
ls.waitUntilCanReleaseAll()
switch ls.state {
case LoadStateDataLoaded:
ls.state = LoadStateReleased
ls.cv.Broadcast()
return newNopLoadStateLockGuard()
case LoadStateOnlyMeta:
ls.state = LoadStateReleased
ls.cv.Broadcast()
return newNopLoadStateLockGuard()
case LoadStateReleased:
// already transit to target state, do nothing.
return nil
default:
panic(fmt.Sprintf("unreachable code: invalid state when releasing data, %s", ls.state.String()))
}
return g
}
// blockUntilDataLoadedOrReleased blocks until the segment is loaded or released.
func (ls *LoadStateLock) BlockUntilDataLoadedOrReleased() {
ls.cv.L.Lock()
defer ls.cv.L.Unlock()
for ls.state != LoadStateDataLoaded && ls.state != LoadStateReleased {
ls.cv.Wait()
}
ls.waitOrPanic(func(state loadStateEnum) bool {
return state == LoadStateDataLoaded || state == LoadStateReleased
}, noop)
}
// waitUntilCanReleaseData waits until segment is release data able.
func (ls *LoadStateLock) waitUntilCanReleaseData() {
state := ls.state
for state != LoadStateDataLoaded && state != LoadStateOnlyMeta && state != LoadStateReleased {
ls.cv.Wait()
state = ls.state
}
func (ls *LoadStateLock) canReleaseData(state loadStateEnum) bool {
return state == LoadStateDataLoaded || state == LoadStateOnlyMeta || state == LoadStateReleased
}
// waitUntilCanReleaseAll waits until segment is releasable.
func (ls *LoadStateLock) waitUntilCanReleaseAll() {
state := ls.state
for (state != LoadStateDataLoaded && state != LoadStateOnlyMeta && state != LoadStateReleased) || ls.refCnt.Load() != 0 {
ls.cv.Wait()
state = ls.state
func (ls *LoadStateLock) canReleaseAll(state loadStateEnum) bool {
return (state == LoadStateDataLoaded || state == LoadStateOnlyMeta || state == LoadStateReleased) && ls.refCnt.Load() == 0
}
func (ls *LoadStateLock) waitOrPanic(ready func(state loadStateEnum) bool, then func()) {
ch := make(chan struct{})
maxWaitTime := paramtable.Get().CommonCfg.MaxWLockConditionalWaitTime.GetAsDuration(time.Second)
go func() {
ls.cv.L.Lock()
defer ls.cv.L.Unlock()
defer close(ch)
for !ready(ls.state) {
ls.cv.Wait()
}
then()
}()
select {
case <-time.After(maxWaitTime):
panic(fmt.Sprintf("max WLock wait time(%v) excceeded", maxWaitTime))
case <-ch:
}
}

View File

@ -6,9 +6,12 @@ import (
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
func TestLoadStateLoadData(t *testing.T) {
paramtable.Init()
l := NewLoadStateLock(LoadStateOnlyMeta)
// Test Load Data, roll back
g, err := l.StartLoadData()
@ -44,6 +47,7 @@ func TestLoadStateLoadData(t *testing.T) {
}
func TestStartReleaseData(t *testing.T) {
paramtable.Init()
l := NewLoadStateLock(LoadStateOnlyMeta)
// Test Release Data, nothing to do on only meta.
g := l.StartReleaseData()
@ -104,6 +108,7 @@ func TestStartReleaseData(t *testing.T) {
}
func TestBlockUntilDataLoadedOrReleased(t *testing.T) {
paramtable.Init()
l := NewLoadStateLock(LoadStateOnlyMeta)
ch := make(chan struct{})
go func() {
@ -122,6 +127,7 @@ func TestBlockUntilDataLoadedOrReleased(t *testing.T) {
}
func TestStartReleaseAll(t *testing.T) {
paramtable.Init()
l := NewLoadStateLock(LoadStateOnlyMeta)
// Test Release All, nothing to do on only meta.
g := l.StartReleaseAll()
@ -183,22 +189,52 @@ func TestStartReleaseAll(t *testing.T) {
assert.Equal(t, LoadStateReleased, l.state)
}
func TestRLock(t *testing.T) {
func TestWaitOrPanic(t *testing.T) {
paramtable.Init()
t.Run("normal", func(t *testing.T) {
paramtable.Get().Save(paramtable.Get().CommonCfg.MaxWLockConditionalWaitTime.Key, "600")
defer paramtable.Get().Reset(paramtable.Get().CommonCfg.MaxWLockConditionalWaitTime.Key)
l := NewLoadStateLock(LoadStateDataLoaded)
assert.NotPanics(t, func() {
l.waitOrPanic(func(state loadStateEnum) bool {
return state == LoadStateDataLoaded
}, noop)
})
})
t.Run("timeout_panic", func(t *testing.T) {
paramtable.Get().Save(paramtable.Get().CommonCfg.MaxWLockConditionalWaitTime.Key, "1")
defer paramtable.Get().Reset(paramtable.Get().CommonCfg.MaxWLockConditionalWaitTime.Key)
l := NewLoadStateLock(LoadStateOnlyMeta)
assert.Panics(t, func() {
l.waitOrPanic(func(state loadStateEnum) bool {
return state == LoadStateDataLoaded
}, noop)
})
})
}
func TestPinIf(t *testing.T) {
l := NewLoadStateLock(LoadStateOnlyMeta)
assert.True(t, l.RLockIf(IsNotReleased))
l.RUnlock()
assert.False(t, l.RLockIf(IsDataLoaded))
assert.True(t, l.PinIf(IsNotReleased))
l.Unpin()
assert.False(t, l.PinIf(IsDataLoaded))
l = NewLoadStateLock(LoadStateDataLoaded)
assert.True(t, l.RLockIf(IsNotReleased))
l.RUnlock()
assert.True(t, l.RLockIf(IsDataLoaded))
l.RUnlock()
assert.True(t, l.PinIf(IsNotReleased))
l.Unpin()
assert.True(t, l.PinIf(IsDataLoaded))
l.Unpin()
l = NewLoadStateLock(LoadStateOnlyMeta)
l.StartReleaseAll().Done(nil)
assert.False(t, l.RLockIf(IsNotReleased))
assert.False(t, l.RLockIf(IsDataLoaded))
assert.False(t, l.PinIf(IsNotReleased))
assert.False(t, l.PinIf(IsDataLoaded))
}
func TestPin(t *testing.T) {

View File

@ -254,9 +254,10 @@ type commonConfig struct {
MetricsPort ParamItem `refreshable:"false"`
// lock related params
EnableLockMetrics ParamItem `refreshable:"false"`
LockSlowLogInfoThreshold ParamItem `refreshable:"true"`
LockSlowLogWarnThreshold ParamItem `refreshable:"true"`
EnableLockMetrics ParamItem `refreshable:"false"`
LockSlowLogInfoThreshold ParamItem `refreshable:"true"`
LockSlowLogWarnThreshold ParamItem `refreshable:"true"`
MaxWLockConditionalWaitTime ParamItem `refreshable:"true"`
StorageScheme ParamItem `refreshable:"false"`
EnableStorageV2 ParamItem `refreshable:"false"`
@ -753,6 +754,15 @@ like the old password verification when updating the credential`,
}
p.LockSlowLogWarnThreshold.Init(base.mgr)
p.MaxWLockConditionalWaitTime = ParamItem{
Key: "common.locks.maxWLockConditionalWaitTime",
Version: "2.5.4",
DefaultValue: "600",
Doc: "maximum seconds for waiting wlock conditional",
Export: true,
}
p.MaxWLockConditionalWaitTime.Init(base.mgr)
p.EnableStorageV2 = ParamItem{
Key: "common.storage.enablev2",
Version: "2.3.1",