mirror of https://github.com/milvus-io/milvus.git
enhance: change Allocator to generic (#33581)
Signed-off-by: sunby <sunbingyi1992@gmail.com>pull/33617/head
parent
c6f8a73bb2
commit
d610fdf033
|
@ -60,29 +60,27 @@ func (r Resource) Le(limit *Resource) bool {
|
||||||
return r.Memory <= limit.Memory && r.CPU <= limit.CPU && r.Disk <= limit.Disk
|
return r.Memory <= limit.Memory && r.CPU <= limit.CPU && r.Disk <= limit.Disk
|
||||||
}
|
}
|
||||||
|
|
||||||
type Allocator interface {
|
type Allocator[T comparable] interface {
|
||||||
// Allocate allocates the resource, returns true if the resource is allocated. If allocation failed, returns the short resource.
|
// Allocate allocates the resource, returns true if the resource is allocated. If allocation failed, returns the short resource.
|
||||||
// The short resource is a positive value, e.g., if there is additional 8 bytes in disk needed, returns (0, 0, 8).
|
// The short resource is a positive value, e.g., if there is additional 8 bytes in disk needed, returns (0, 0, 8).
|
||||||
Allocate(id string, r *Resource) (allocated bool, short *Resource)
|
Allocate(id T, r *Resource) (allocated bool, short *Resource)
|
||||||
// Release releases the resource
|
// Release releases the resource
|
||||||
Release(id string)
|
Release(id T)
|
||||||
// Used returns the used resource
|
// Used returns the used resource
|
||||||
Used() Resource
|
Used() Resource
|
||||||
// Inspect returns the allocated resources
|
// Inspect returns the allocated resources
|
||||||
Inspect() map[string]*Resource
|
Inspect() map[T]*Resource
|
||||||
}
|
}
|
||||||
|
|
||||||
type FixedSizeAllocator struct {
|
type FixedSizeAllocator[T comparable] struct {
|
||||||
limit *Resource
|
limit *Resource
|
||||||
|
|
||||||
lock sync.RWMutex
|
lock sync.RWMutex
|
||||||
used Resource
|
used Resource
|
||||||
allocs map[string]*Resource
|
allocs map[T]*Resource
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ Allocator = (*FixedSizeAllocator)(nil)
|
func (a *FixedSizeAllocator[T]) Allocate(id T, r *Resource) (allocated bool, short *Resource) {
|
||||||
|
|
||||||
func (a *FixedSizeAllocator) Allocate(id string, r *Resource) (allocated bool, short *Resource) {
|
|
||||||
a.lock.Lock()
|
a.lock.Lock()
|
||||||
defer a.lock.Unlock()
|
defer a.lock.Unlock()
|
||||||
if a.used.Add(r).Le(a.limit) {
|
if a.used.Add(r).Le(a.limit) {
|
||||||
|
@ -99,7 +97,7 @@ func (a *FixedSizeAllocator) Allocate(id string, r *Resource) (allocated bool, s
|
||||||
return false, short
|
return false, short
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *FixedSizeAllocator) Release(id string) {
|
func (a *FixedSizeAllocator[T]) Release(id T) {
|
||||||
a.lock.Lock()
|
a.lock.Lock()
|
||||||
defer a.lock.Unlock()
|
defer a.lock.Unlock()
|
||||||
r, ok := a.allocs[id]
|
r, ok := a.allocs[id]
|
||||||
|
@ -110,36 +108,34 @@ func (a *FixedSizeAllocator) Release(id string) {
|
||||||
a.used.Sub(r)
|
a.used.Sub(r)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *FixedSizeAllocator) Used() Resource {
|
func (a *FixedSizeAllocator[T]) Used() Resource {
|
||||||
a.lock.RLock()
|
a.lock.RLock()
|
||||||
defer a.lock.RUnlock()
|
defer a.lock.RUnlock()
|
||||||
return a.used
|
return a.used
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *FixedSizeAllocator) Inspect() map[string]*Resource {
|
func (a *FixedSizeAllocator[T]) Inspect() map[T]*Resource {
|
||||||
a.lock.RLock()
|
a.lock.RLock()
|
||||||
defer a.lock.RUnlock()
|
defer a.lock.RUnlock()
|
||||||
return maps.Clone(a.allocs)
|
return maps.Clone(a.allocs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewFixedSizeAllocator(limit *Resource) *FixedSizeAllocator {
|
func NewFixedSizeAllocator[T comparable](limit *Resource) *FixedSizeAllocator[T] {
|
||||||
return &FixedSizeAllocator{
|
return &FixedSizeAllocator[T]{
|
||||||
limit: limit,
|
limit: limit,
|
||||||
allocs: make(map[string]*Resource),
|
allocs: make(map[T]*Resource),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// PhysicalAwareFixedSizeAllocator allocates resources with additional consideration of physical resource usage.
|
// PhysicalAwareFixedSizeAllocator allocates resources with additional consideration of physical resource usage.
|
||||||
type PhysicalAwareFixedSizeAllocator struct {
|
type PhysicalAwareFixedSizeAllocator[T comparable] struct {
|
||||||
FixedSizeAllocator
|
FixedSizeAllocator[T]
|
||||||
|
|
||||||
hwLimit *Resource
|
hwLimit *Resource
|
||||||
dir string // watching directory for disk usage, probably got by paramtable.Get().LocalStorageCfg.Path.GetValue()
|
dir string // watching directory for disk usage, probably got by paramtable.Get().LocalStorageCfg.Path.GetValue()
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ Allocator = (*PhysicalAwareFixedSizeAllocator)(nil)
|
func (a *PhysicalAwareFixedSizeAllocator[T]) Allocate(id T, r *Resource) (allocated bool, short *Resource) {
|
||||||
|
|
||||||
func (a *PhysicalAwareFixedSizeAllocator) Allocate(id string, r *Resource) (allocated bool, short *Resource) {
|
|
||||||
memoryUsage := int64(hardware.GetUsedMemoryCount())
|
memoryUsage := int64(hardware.GetUsedMemoryCount())
|
||||||
diskUsage := int64(0)
|
diskUsage := int64(0)
|
||||||
if usageStats, err := disk.Usage(a.dir); err != nil {
|
if usageStats, err := disk.Usage(a.dir); err != nil {
|
||||||
|
@ -159,11 +155,11 @@ func (a *PhysicalAwareFixedSizeAllocator) Allocate(id string, r *Resource) (allo
|
||||||
return false, expected.Diff(a.hwLimit)
|
return false, expected.Diff(a.hwLimit)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPhysicalAwareFixedSizeAllocator(limit *Resource, hwMemoryLimit, hwDiskLimit int64, dir string) *PhysicalAwareFixedSizeAllocator {
|
func NewPhysicalAwareFixedSizeAllocator[T comparable](limit *Resource, hwMemoryLimit, hwDiskLimit int64, dir string) *PhysicalAwareFixedSizeAllocator[T] {
|
||||||
return &PhysicalAwareFixedSizeAllocator{
|
return &PhysicalAwareFixedSizeAllocator[T]{
|
||||||
FixedSizeAllocator: FixedSizeAllocator{
|
FixedSizeAllocator: FixedSizeAllocator[T]{
|
||||||
limit: limit,
|
limit: limit,
|
||||||
allocs: make(map[string]*Resource),
|
allocs: make(map[T]*Resource),
|
||||||
},
|
},
|
||||||
hwLimit: &Resource{Memory: hwMemoryLimit, Disk: hwDiskLimit},
|
hwLimit: &Resource{Memory: hwMemoryLimit, Disk: hwDiskLimit},
|
||||||
dir: dir,
|
dir: dir,
|
||||||
|
|
|
@ -27,7 +27,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestFixedSizeAllocator(t *testing.T) {
|
func TestFixedSizeAllocator(t *testing.T) {
|
||||||
a := NewFixedSizeAllocator(&Resource{100, 100, 100})
|
a := NewFixedSizeAllocator[string](&Resource{100, 100, 100})
|
||||||
|
|
||||||
allocated, _ := a.Allocate("a1", &Resource{10, 10, 10})
|
allocated, _ := a.Allocate("a1", &Resource{10, 10, 10})
|
||||||
assert.Equal(t, true, allocated)
|
assert.Equal(t, true, allocated)
|
||||||
|
@ -46,7 +46,7 @@ func TestFixedSizeAllocator(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFixedSizeAllocatorRace(t *testing.T) {
|
func TestFixedSizeAllocatorRace(t *testing.T) {
|
||||||
a := NewFixedSizeAllocator(&Resource{100, 100, 100})
|
a := NewFixedSizeAllocator[string](&Resource{100, 100, 100})
|
||||||
wg := new(sync.WaitGroup)
|
wg := new(sync.WaitGroup)
|
||||||
for i := 0; i < 100; i++ {
|
for i := 0; i < 100; i++ {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
|
@ -64,7 +64,7 @@ func TestFixedSizeAllocatorRace(t *testing.T) {
|
||||||
func TestPhysicalAwareFixedSizeAllocator(t *testing.T) {
|
func TestPhysicalAwareFixedSizeAllocator(t *testing.T) {
|
||||||
hwMemoryLimit := int64(float32(hardware.GetMemoryCount()) * 0.9)
|
hwMemoryLimit := int64(float32(hardware.GetMemoryCount()) * 0.9)
|
||||||
hwDiskLimit := int64(1<<63 - 1)
|
hwDiskLimit := int64(1<<63 - 1)
|
||||||
a := NewPhysicalAwareFixedSizeAllocator(&Resource{100, 100, 100}, hwMemoryLimit, hwDiskLimit, "/tmp")
|
a := NewPhysicalAwareFixedSizeAllocator[string](&Resource{100, 100, 100}, hwMemoryLimit, hwDiskLimit, "/tmp")
|
||||||
|
|
||||||
allocated, _ := a.Allocate("a1", &Resource{10, 10, 10})
|
allocated, _ := a.Allocate("a1", &Resource{10, 10, 10})
|
||||||
assert.Equal(t, true, allocated)
|
assert.Equal(t, true, allocated)
|
||||||
|
|
Loading…
Reference in New Issue