milvus/internal/datanode/importv2/memory_allocator_test.go

229 lines
7.4 KiB
Go

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package importv2
import (
"math/rand"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
// TestMemoryAllocatorBasicOperations tests basic memory allocation and release operations
func TestMemoryAllocatorBasicOperations(t *testing.T) {
// Create memory allocator with 1GB system memory
ma := NewMemoryAllocator(1024 * 1024 * 1024)
// Test initial state
assert.Equal(t, int64(0), ma.(*memoryAllocator).usedMemory)
// Test memory allocation for task 1 using BlockingAllocate
ma.BlockingAllocate(1, 50*1024*1024) // 50MB for task 1
assert.Equal(t, int64(50*1024*1024), ma.(*memoryAllocator).usedMemory)
// Test memory allocation for task 2
ma.BlockingAllocate(2, 50*1024*1024) // 50MB for task 2
assert.Equal(t, int64(100*1024*1024), ma.(*memoryAllocator).usedMemory)
// Test memory release for task 1
ma.Release(1, 50*1024*1024)
assert.Equal(t, int64(50*1024*1024), ma.(*memoryAllocator).usedMemory)
// Test memory release for task 2
ma.Release(2, 50*1024*1024)
assert.Equal(t, int64(0), ma.(*memoryAllocator).usedMemory)
}
// TestMemoryAllocatorMemoryLimit tests memory limit enforcement
func TestMemoryAllocatorMemoryLimit(t *testing.T) {
// Create memory allocator with 1GB system memory
ma := NewMemoryAllocator(1024 * 1024 * 1024)
// Get the memory limit based on system memory and configuration percentage
memoryLimit := ma.(*memoryAllocator).systemTotalMemory
// Use a reasonable test size that should be within limits
testSize := memoryLimit / 10 // Use 10% of available memory
// Allocate memory up to the limit
ma.BlockingAllocate(1, testSize)
assert.Equal(t, testSize, ma.(*memoryAllocator).usedMemory)
// Try to allocate more memory than available (this will block, so we test in a goroutine)
done := make(chan bool)
go func() {
ma.BlockingAllocate(2, testSize)
done <- true
}()
// Release the allocated memory to unblock the waiting allocation
ma.Release(1, testSize)
<-done
// Verify that the second allocation succeeded after release
assert.Equal(t, testSize, ma.(*memoryAllocator).usedMemory)
// Release the second allocation
ma.Release(2, testSize)
assert.Equal(t, int64(0), ma.(*memoryAllocator).usedMemory)
}
// TestMemoryAllocatorConcurrentAccess tests concurrent memory allocation and release
func TestMemoryAllocatorConcurrentAccess(t *testing.T) {
// Create memory allocator with 1GB system memory
ma := NewMemoryAllocator(1024 * 1024 * 1024)
// Test concurrent memory requests
done := make(chan bool, 10)
for i := 0; i < 10; i++ {
taskID := int64(i + 1)
go func() {
ma.BlockingAllocate(taskID, 50*1024*1024) // 50MB each
ma.Release(taskID, 50*1024*1024)
done <- true
}()
}
// Wait for all goroutines to complete
for i := 0; i < 10; i++ {
<-done
}
// Verify final state - should be 0 since all allocations were released
finalMemory := ma.(*memoryAllocator).usedMemory
assert.Equal(t, int64(0), finalMemory)
}
// TestMemoryAllocatorNegativeRelease tests handling of negative memory release
func TestMemoryAllocatorNegativeRelease(t *testing.T) {
// Create memory allocator with 1GB system memory
ma := NewMemoryAllocator(1024 * 1024 * 1024)
// Allocate some memory
ma.BlockingAllocate(1, 100*1024*1024) // 100MB
assert.Equal(t, int64(100*1024*1024), ma.(*memoryAllocator).usedMemory)
// Release more than allocated (should not go negative)
ma.Release(1, 200*1024*1024) // 200MB
assert.Equal(t, int64(0), ma.(*memoryAllocator).usedMemory) // Should be reset to 0
}
// TestMemoryAllocatorMultipleTasks tests memory management for multiple tasks
func TestMemoryAllocatorMultipleTasks(t *testing.T) {
// Create memory allocator with 1GB system memory
ma := NewMemoryAllocator(1024 * 1024 * 1024 * 2)
// Allocate memory for multiple tasks with smaller sizes
taskIDs := []int64{1, 2, 3, 4, 5}
sizes := []int64{20, 30, 25, 15, 35} // Total: 125MB
for i, taskID := range taskIDs {
ma.BlockingAllocate(taskID, sizes[i]*1024*1024)
}
// Verify total used memory
expectedTotal := int64(0)
for _, size := range sizes {
expectedTotal += size * 1024 * 1024
}
assert.Equal(t, expectedTotal, ma.(*memoryAllocator).usedMemory)
// Release memory for specific tasks
ma.Release(2, 30*1024*1024) // Release task 2
ma.Release(4, 15*1024*1024) // Release task 4
// Verify updated memory usage
expectedTotal = (20 + 25 + 35) * 1024 * 1024 // 80MB
assert.Equal(t, expectedTotal, ma.(*memoryAllocator).usedMemory)
// Release remaining tasks
ma.Release(1, 20*1024*1024)
ma.Release(3, 25*1024*1024)
ma.Release(5, 35*1024*1024)
// Verify final state
assert.Equal(t, int64(0), ma.(*memoryAllocator).usedMemory)
}
// TestMemoryAllocatorZeroSize tests handling of zero size allocations
func TestMemoryAllocatorZeroSize(t *testing.T) {
// Create memory allocator
ma := NewMemoryAllocator(1024 * 1024 * 1024)
// Test zero size allocation
ma.BlockingAllocate(1, 0)
assert.Equal(t, int64(0), ma.(*memoryAllocator).usedMemory)
// Test zero size release
ma.Release(1, 0)
assert.Equal(t, int64(0), ma.(*memoryAllocator).usedMemory)
}
// TestMemoryAllocatorSimple tests basic functionality without external dependencies
func TestMemoryAllocatorSimple(t *testing.T) {
// Create memory allocator with 1GB system memory
ma := NewMemoryAllocator(1024 * 1024 * 1024)
// Test initial state
assert.Equal(t, int64(0), ma.(*memoryAllocator).usedMemory)
// Test memory allocation
ma.BlockingAllocate(1, 50*1024*1024) // 50MB
assert.Equal(t, int64(50*1024*1024), ma.(*memoryAllocator).usedMemory)
// Test memory release
ma.Release(1, 50*1024*1024)
assert.Equal(t, int64(0), ma.(*memoryAllocator).usedMemory)
}
// TestMemoryAllocatorMassiveConcurrency tests massive concurrent memory allocation and release
func TestMemoryAllocatorMassiveConcurrency(t *testing.T) {
// Create memory allocator with 1.6GB system memory
totalMemory := int64(16 * 1024 * 1024 * 1024) // 16GB * 10%
ma := NewMemoryAllocator(totalMemory)
const numTasks = 200
var wg sync.WaitGroup
wg.Add(numTasks)
// Start concurrent allocation and release
for i := 0; i < numTasks; i++ {
taskID := int64(i + 1)
var memorySize int64
// 10% chance to allocate 1.6GB, 90% chance to allocate 128MB-1536MB
if rand.Float64() < 0.1 {
memorySize = int64(1600 * 1024 * 1024)
} else {
multiple := rand.Intn(12) + 1
memorySize = int64(multiple * 128 * 1024 * 1024) // 128MB to 1536MB
}
go func(id int64, size int64) {
defer wg.Done()
ma.BlockingAllocate(id, size)
time.Sleep(1 * time.Millisecond)
ma.Release(id, size)
}(taskID, memorySize)
}
wg.Wait()
// Assert that all memory is released
finalMemory := ma.(*memoryAllocator).usedMemory
assert.Equal(t, int64(0), finalMemory, "All memory should be released")
}