mirror of https://github.com/milvus-io/milvus.git
enhance: keylock object background recycling (#38805)
issue: #38587 Signed-off-by: SimFG <bang.fu@zilliz.com>pull/39165/head
parent
357eaf0d71
commit
86d665a50f
|
@ -12,6 +12,7 @@ require (
|
|||
github.com/containerd/cgroups/v3 v3.0.3
|
||||
github.com/expr-lang/expr v1.15.7
|
||||
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
|
||||
github.com/jolestar/go-commons-pool/v2 v2.1.2
|
||||
github.com/json-iterator/go v1.1.12
|
||||
github.com/klauspost/compress v1.17.7
|
||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241211060635-410431d7865b
|
||||
|
|
|
@ -190,6 +190,8 @@ github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGE
|
|||
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
|
||||
github.com/form3tech-oss/jwt-go v3.2.3+incompatible h1:7ZaBxOI7TMoYBfyA3cQHErNNyAWIKUMIwqxEtgHOs5c=
|
||||
github.com/form3tech-oss/jwt-go v3.2.3+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
|
||||
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
|
||||
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
|
||||
github.com/frankban/quicktest v1.2.2/go.mod h1:Qh/WofXFeiAFII1aEBu529AtJo6Zg2VHscnEsbBnJ20=
|
||||
github.com/frankban/quicktest v1.7.2/go.mod h1:jaStnuzAqU1AJdCO0l53JDCJrVDKcS03DbaAcR7Ks/o=
|
||||
github.com/frankban/quicktest v1.10.0/go.mod h1:ui7WezCLWMWxVWr1GETZY3smRy0G4KWq9vcPtJmFl7Y=
|
||||
|
@ -402,6 +404,8 @@ github.com/jhump/goprotoc v0.5.0/go.mod h1:VrbvcYrQOrTi3i0Vf+m+oqQWk9l72mjkJCYo7
|
|||
github.com/jhump/protoreflect v1.11.0/go.mod h1:U7aMIjN0NWq9swDP7xDdoMfRHb35uiuTd3Z9nFXJf5E=
|
||||
github.com/jhump/protoreflect v1.12.0/go.mod h1:JytZfP5d0r8pVNLZvai7U/MCuTWITgrI4tTg7puQFKI=
|
||||
github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik=
|
||||
github.com/jolestar/go-commons-pool/v2 v2.1.2 h1:E+XGo58F23t7HtZiC/W6jzO2Ux2IccSH/yx4nD+J1CM=
|
||||
github.com/jolestar/go-commons-pool/v2 v2.1.2/go.mod h1:r4NYccrkS5UqP1YQI1COyTZ9UjPJAAGTUxzcsK1kqhY=
|
||||
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
|
||||
github.com/jonboulle/clockwork v0.2.2 h1:UOGuzwb1PwsrDAObMuhUnj0p5ULPj8V/xJ7Kx9qUBdQ=
|
||||
github.com/jonboulle/clockwork v0.2.2/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8=
|
||||
|
|
|
@ -17,13 +17,35 @@
|
|||
package lock
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
pool "github.com/jolestar/go-commons-pool/v2"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
)
|
||||
|
||||
var (
|
||||
ctx = context.Background()
|
||||
lockPoolFactory = pool.NewPooledObjectFactorySimple(func(ctx2 context.Context) (interface{}, error) {
|
||||
return newRefLock(), nil
|
||||
})
|
||||
lockerPoolConfig = &pool.ObjectPoolConfig{
|
||||
LIFO: pool.DefaultLIFO,
|
||||
MaxTotal: 64,
|
||||
MaxIdle: 64,
|
||||
MinIdle: pool.DefaultMinIdle,
|
||||
MinEvictableIdleTime: pool.DefaultMinEvictableIdleTime,
|
||||
SoftMinEvictableIdleTime: pool.DefaultSoftMinEvictableIdleTime,
|
||||
NumTestsPerEvictionRun: pool.DefaultNumTestsPerEvictionRun,
|
||||
EvictionPolicyName: pool.DefaultEvictionPolicyName,
|
||||
EvictionContext: ctx,
|
||||
BlockWhenExhausted: false,
|
||||
}
|
||||
refLockPoolPool = pool.NewObjectPool(ctx, lockPoolFactory, lockerPoolConfig)
|
||||
)
|
||||
|
||||
type RefLock struct {
|
||||
mutex sync.RWMutex
|
||||
refCounter int
|
||||
|
@ -33,8 +55,12 @@ func (m *RefLock) ref() {
|
|||
m.refCounter++
|
||||
}
|
||||
|
||||
func (m *RefLock) unref() {
|
||||
m.refCounter--
|
||||
func (m *RefLock) unref() bool {
|
||||
if m.refCounter > 0 {
|
||||
m.refCounter--
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func newRefLock() *RefLock {
|
||||
|
@ -66,7 +92,14 @@ func (k *KeyLock[K]) Lock(key K) {
|
|||
k.keyLocksMutex.Unlock()
|
||||
keyLock.mutex.Lock()
|
||||
} else {
|
||||
newKLock := newRefLock()
|
||||
obj, err := refLockPoolPool.BorrowObject(ctx)
|
||||
if err != nil {
|
||||
log.Ctx(ctx).Error("BorrowObject failed", zap.Error(err))
|
||||
k.keyLocksMutex.Unlock()
|
||||
return
|
||||
}
|
||||
newKLock := obj.(*RefLock)
|
||||
// newKLock := newRefLock()
|
||||
newKLock.mutex.Lock()
|
||||
k.refLocks[key] = newKLock
|
||||
newKLock.ref()
|
||||
|
@ -86,6 +119,7 @@ func (k *KeyLock[K]) Unlock(lockedKey K) {
|
|||
}
|
||||
keyLock.unref()
|
||||
if keyLock.refCounter == 0 {
|
||||
_ = refLockPoolPool.ReturnObject(ctx, keyLock)
|
||||
delete(k.refLocks, lockedKey)
|
||||
}
|
||||
keyLock.mutex.Unlock()
|
||||
|
@ -100,7 +134,14 @@ func (k *KeyLock[K]) RLock(key K) {
|
|||
k.keyLocksMutex.Unlock()
|
||||
keyLock.mutex.RLock()
|
||||
} else {
|
||||
newKLock := newRefLock()
|
||||
obj, err := refLockPoolPool.BorrowObject(ctx)
|
||||
if err != nil {
|
||||
log.Ctx(ctx).Error("BorrowObject failed", zap.Error(err))
|
||||
k.keyLocksMutex.Unlock()
|
||||
return
|
||||
}
|
||||
newKLock := obj.(*RefLock)
|
||||
// newKLock := newRefLock()
|
||||
newKLock.mutex.RLock()
|
||||
k.refLocks[key] = newKLock
|
||||
newKLock.ref()
|
||||
|
@ -120,6 +161,7 @@ func (k *KeyLock[K]) RUnlock(lockedKey K) {
|
|||
}
|
||||
keyLock.unref()
|
||||
if keyLock.refCounter == 0 {
|
||||
_ = refLockPoolPool.ReturnObject(ctx, keyLock)
|
||||
delete(k.refLocks, lockedKey)
|
||||
}
|
||||
keyLock.mutex.RUnlock()
|
||||
|
|
|
@ -67,3 +67,18 @@ func TestKeyRLock(t *testing.T) {
|
|||
wg.Wait()
|
||||
assert.Equal(t, keyLock.size(), 0)
|
||||
}
|
||||
|
||||
func TestNewKeyLock(t *testing.T) {
|
||||
keyLock := NewKeyLock[string]()
|
||||
keyLock.Lock("a")
|
||||
keyLock.Lock("b")
|
||||
|
||||
keyLock.Unlock("a")
|
||||
keyLock.Unlock("b")
|
||||
|
||||
assert.Equal(t, 0, keyLock.size())
|
||||
keyLock.keyLocksMutex.Lock()
|
||||
keyLen := len(keyLock.refLocks)
|
||||
keyLock.keyLocksMutex.Unlock()
|
||||
assert.Equal(t, 0, keyLen)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue