mirror of https://github.com/milvus-io/milvus.git
enhance: prevent multiple query nodes from causing excessive occupancy of a single node, leading to GPU memory overflow (#39276) (#38617)
issue: #39276 Signed-off-by: yusheng.ma <yusheng.ma@zilliz.com>pull/39307/head
parent
0df2c75b77
commit
38881bf591
2
Makefile
2
Makefile
|
@ -95,7 +95,7 @@ milvus-gpu: build-cpp-gpu print-gpu-build-info
|
||||||
@source $(PWD)/scripts/setenv.sh && \
|
@source $(PWD)/scripts/setenv.sh && \
|
||||||
mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && \
|
mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && \
|
||||||
CGO_LDFLAGS="$(CGO_LDFLAGS)" CGO_CFLAGS="$(CGO_CFLAGS)" GO111MODULE=on $(GO) build -pgo=$(PGO_PATH)/default.pgo -ldflags="-r $${RPATH} -X '$(OBJPREFIX).BuildTags=$(BUILD_TAGS_GPU)' -X '$(OBJPREFIX).BuildTime=$(BUILD_TIME)' -X '$(OBJPREFIX).GitCommit=$(GIT_COMMIT)' -X '$(OBJPREFIX).GoVersion=$(GO_VERSION)'" \
|
CGO_LDFLAGS="$(CGO_LDFLAGS)" CGO_CFLAGS="$(CGO_CFLAGS)" GO111MODULE=on $(GO) build -pgo=$(PGO_PATH)/default.pgo -ldflags="-r $${RPATH} -X '$(OBJPREFIX).BuildTags=$(BUILD_TAGS_GPU)' -X '$(OBJPREFIX).BuildTime=$(BUILD_TIME)' -X '$(OBJPREFIX).GitCommit=$(GIT_COMMIT)' -X '$(OBJPREFIX).GoVersion=$(GO_VERSION)'" \
|
||||||
-tags $(MILVUS_GO_BUILD_TAGS) -o $(INSTALL_PATH)/milvus $(PWD)/cmd/main.go 1>/dev/null
|
-tags "$(MILVUS_GO_BUILD_TAGS),cuda" -o $(INSTALL_PATH)/milvus $(PWD)/cmd/main.go 1>/dev/null
|
||||||
|
|
||||||
get-build-deps:
|
get-build-deps:
|
||||||
@(env bash $(PWD)/scripts/install_deps.sh)
|
@(env bash $(PWD)/scripts/install_deps.sh)
|
||||||
|
|
|
@ -1101,6 +1101,7 @@ trace:
|
||||||
gpu:
|
gpu:
|
||||||
initMemSize: 2048 # Gpu Memory Pool init size
|
initMemSize: 2048 # Gpu Memory Pool init size
|
||||||
maxMemSize: 4096 # Gpu Memory Pool Max size
|
maxMemSize: 4096 # Gpu Memory Pool Max size
|
||||||
|
overloadedMemoryThresholdPercentage: 95
|
||||||
|
|
||||||
# Any configuration related to the streaming node server.
|
# Any configuration related to the streaming node server.
|
||||||
streamingNode:
|
streamingNode:
|
||||||
|
|
|
@ -31,9 +31,10 @@ import (
|
||||||
|
|
||||||
// ResourceUsage is used to estimate the resource usage of a sealed segment.
|
// ResourceUsage is used to estimate the resource usage of a sealed segment.
|
||||||
type ResourceUsage struct {
|
type ResourceUsage struct {
|
||||||
MemorySize uint64
|
MemorySize uint64
|
||||||
DiskSize uint64
|
DiskSize uint64
|
||||||
MmapFieldCount int
|
MmapFieldCount int
|
||||||
|
FieldGpuMemorySize []uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
// Segment is the interface of a segment implementation.
|
// Segment is the interface of a segment implementation.
|
||||||
|
|
|
@ -27,6 +27,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"math"
|
||||||
"path"
|
"path"
|
||||||
"runtime/debug"
|
"runtime/debug"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
@ -44,6 +45,7 @@ import (
|
||||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||||
"github.com/milvus-io/milvus/internal/querynodev2/pkoracle"
|
"github.com/milvus-io/milvus/internal/querynodev2/pkoracle"
|
||||||
"github.com/milvus-io/milvus/internal/storage"
|
"github.com/milvus-io/milvus/internal/storage"
|
||||||
|
"github.com/milvus-io/milvus/internal/util/vecindexmgr"
|
||||||
"github.com/milvus-io/milvus/pkg/common"
|
"github.com/milvus-io/milvus/pkg/common"
|
||||||
"github.com/milvus-io/milvus/pkg/log"
|
"github.com/milvus-io/milvus/pkg/log"
|
||||||
"github.com/milvus-io/milvus/pkg/metrics"
|
"github.com/milvus-io/milvus/pkg/metrics"
|
||||||
|
@ -1384,6 +1386,7 @@ func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadIn
|
||||||
maxSegmentSize := uint64(0)
|
maxSegmentSize := uint64(0)
|
||||||
predictMemUsage := memUsage
|
predictMemUsage := memUsage
|
||||||
predictDiskUsage := diskUsage
|
predictDiskUsage := diskUsage
|
||||||
|
var predictGpuMemUsage []uint64
|
||||||
mmapFieldCount := 0
|
mmapFieldCount := 0
|
||||||
for _, loadInfo := range segmentLoadInfos {
|
for _, loadInfo := range segmentLoadInfos {
|
||||||
collection := loader.manager.Collection.Get(loadInfo.GetCollectionID())
|
collection := loader.manager.Collection.Get(loadInfo.GetCollectionID())
|
||||||
|
@ -1406,6 +1409,7 @@ func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadIn
|
||||||
mmapFieldCount += usage.MmapFieldCount
|
mmapFieldCount += usage.MmapFieldCount
|
||||||
predictDiskUsage += usage.DiskSize
|
predictDiskUsage += usage.DiskSize
|
||||||
predictMemUsage += usage.MemorySize
|
predictMemUsage += usage.MemorySize
|
||||||
|
predictGpuMemUsage = usage.FieldGpuMemorySize
|
||||||
if usage.MemorySize > maxSegmentSize {
|
if usage.MemorySize > maxSegmentSize {
|
||||||
maxSegmentSize = usage.MemorySize
|
maxSegmentSize = usage.MemorySize
|
||||||
}
|
}
|
||||||
|
@ -1440,6 +1444,10 @@ func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadIn
|
||||||
paramtable.Get().QueryNodeCfg.MaxDiskUsagePercentage.GetAsFloat()))
|
paramtable.Get().QueryNodeCfg.MaxDiskUsagePercentage.GetAsFloat()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err := checkSegmentGpuMemSize(predictGpuMemUsage, float32(paramtable.Get().GpuConfig.OverloadedMemoryThresholdPercentage.GetAsFloat()))
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, err
|
||||||
|
}
|
||||||
return predictMemUsage - memUsage, predictDiskUsage - diskUsage, nil
|
return predictMemUsage - memUsage, predictDiskUsage - diskUsage, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1448,6 +1456,7 @@ func getResourceUsageEstimateOfSegment(schema *schemapb.CollectionSchema, loadIn
|
||||||
var segmentMemorySize, segmentDiskSize uint64
|
var segmentMemorySize, segmentDiskSize uint64
|
||||||
var indexMemorySize uint64
|
var indexMemorySize uint64
|
||||||
var mmapFieldCount int
|
var mmapFieldCount int
|
||||||
|
var fieldGpuMemorySize []uint64
|
||||||
|
|
||||||
fieldID2IndexInfo := make(map[int64]*querypb.FieldIndexInfo)
|
fieldID2IndexInfo := make(map[int64]*querypb.FieldIndexInfo)
|
||||||
for _, fieldIndexInfo := range loadInfo.IndexInfos {
|
for _, fieldIndexInfo := range loadInfo.IndexInfos {
|
||||||
|
@ -1492,9 +1501,11 @@ func getResourceUsageEstimateOfSegment(schema *schemapb.CollectionSchema, loadIn
|
||||||
loadInfo.GetSegmentID(),
|
loadInfo.GetSegmentID(),
|
||||||
fieldIndexInfo.GetBuildID())
|
fieldIndexInfo.GetBuildID())
|
||||||
}
|
}
|
||||||
|
|
||||||
indexMemorySize += estimateResult.MaxMemoryCost
|
indexMemorySize += estimateResult.MaxMemoryCost
|
||||||
segmentDiskSize += estimateResult.MaxDiskCost
|
segmentDiskSize += estimateResult.MaxDiskCost
|
||||||
|
if vecindexmgr.GetVecIndexMgrInstance().IsGPUVecIndex(common.GetIndexType(fieldIndexInfo.IndexParams)) {
|
||||||
|
fieldGpuMemorySize = append(fieldGpuMemorySize, estimateResult.MaxMemoryCost)
|
||||||
|
}
|
||||||
if !estimateResult.HasRawData && !isVectorType {
|
if !estimateResult.HasRawData && !isVectorType {
|
||||||
shouldCalculateDataSize = true
|
shouldCalculateDataSize = true
|
||||||
}
|
}
|
||||||
|
@ -1555,9 +1566,10 @@ func getResourceUsageEstimateOfSegment(schema *schemapb.CollectionSchema, loadIn
|
||||||
segmentMemorySize += uint64(float64(memSize) * expansionFactor)
|
segmentMemorySize += uint64(float64(memSize) * expansionFactor)
|
||||||
}
|
}
|
||||||
return &ResourceUsage{
|
return &ResourceUsage{
|
||||||
MemorySize: segmentMemorySize + indexMemorySize,
|
MemorySize: segmentMemorySize + indexMemorySize,
|
||||||
DiskSize: segmentDiskSize,
|
DiskSize: segmentDiskSize,
|
||||||
MmapFieldCount: mmapFieldCount,
|
MmapFieldCount: mmapFieldCount,
|
||||||
|
FieldGpuMemorySize: fieldGpuMemorySize,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1680,3 +1692,39 @@ func getBinlogDataMemorySize(fieldBinlog *datapb.FieldBinlog) int64 {
|
||||||
|
|
||||||
return fieldSize
|
return fieldSize
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func checkSegmentGpuMemSize(fieldGpuMemSizeList []uint64, OverloadedMemoryThresholdPercentage float32) error {
|
||||||
|
gpuInfos, err := hardware.GetAllGPUMemoryInfo()
|
||||||
|
if err != nil {
|
||||||
|
if len(fieldGpuMemSizeList) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
var usedGpuMem []uint64
|
||||||
|
var maxGpuMemSize []uint64
|
||||||
|
for _, gpuInfo := range gpuInfos {
|
||||||
|
usedGpuMem = append(usedGpuMem, gpuInfo.TotalMemory-gpuInfo.FreeMemory)
|
||||||
|
maxGpuMemSize = append(maxGpuMemSize, uint64(float32(gpuInfo.TotalMemory)*OverloadedMemoryThresholdPercentage))
|
||||||
|
}
|
||||||
|
currentGpuMem := usedGpuMem
|
||||||
|
for _, fieldGpuMem := range fieldGpuMemSizeList {
|
||||||
|
var minId int = -1
|
||||||
|
var minGpuMem uint64 = math.MaxUint64
|
||||||
|
for i := int(0); i < len(gpuInfos); i++ {
|
||||||
|
GpuiMem := currentGpuMem[i] + fieldGpuMem
|
||||||
|
if GpuiMem < maxGpuMemSize[i] && GpuiMem < minGpuMem {
|
||||||
|
minId = i
|
||||||
|
minGpuMem = GpuiMem
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if minId == -1 {
|
||||||
|
return fmt.Errorf("load segment failed, GPU OOM if loaded, GpuMemUsage(bytes) = %v, usedGpuMem(bytes) = %v, maxGPUMem(bytes) = %v",
|
||||||
|
fieldGpuMem,
|
||||||
|
usedGpuMem,
|
||||||
|
maxGpuMemSize)
|
||||||
|
}
|
||||||
|
currentGpuMem[minId] += minGpuMem
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,18 @@
|
||||||
|
//go:build !cuda
|
||||||
|
// +build !cuda
|
||||||
|
|
||||||
|
package hardware
|
||||||
|
|
||||||
|
import "github.com/cockroachdb/errors"
|
||||||
|
|
||||||
|
// GPUMemoryInfo holds information about a GPU's memory
|
||||||
|
type GPUMemoryInfo struct {
|
||||||
|
TotalMemory uint64 // Total memory available on the GPU
|
||||||
|
FreeMemory uint64 // Free memory available on the GPU
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetAllGPUMemoryInfo returns mock GPU memory information for non-CUDA builds
|
||||||
|
func GetAllGPUMemoryInfo() ([]GPUMemoryInfo, error) {
|
||||||
|
// Mock error to indicate no CUDA support
|
||||||
|
return nil, errors.New("CUDA not supported: failed to retrieve GPU memory info or no GPUs found")
|
||||||
|
}
|
|
@ -0,0 +1,90 @@
|
||||||
|
//go:build cuda
|
||||||
|
// +build cuda
|
||||||
|
|
||||||
|
package hardware
|
||||||
|
|
||||||
|
/*
|
||||||
|
#cgo CFLAGS: -I/usr/local/cuda/include
|
||||||
|
#cgo LDFLAGS: -L/usr/local/cuda/lib64 -lcudart
|
||||||
|
#include <cuda_runtime.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
|
||||||
|
// Structure to store GPU memory info
|
||||||
|
typedef struct {
|
||||||
|
size_t totalMemory;
|
||||||
|
size_t freeMemory;
|
||||||
|
} GPUMemoryInfo;
|
||||||
|
|
||||||
|
// Function to get memory info for all GPUs
|
||||||
|
int getAllGPUMemoryInfo(GPUMemoryInfo** infos) {
|
||||||
|
int deviceCount = 0;
|
||||||
|
cudaError_t err = cudaGetDeviceCount(&deviceCount);
|
||||||
|
if (err != cudaSuccess || deviceCount == 0) {
|
||||||
|
return 0; // No GPUs found or error occurred
|
||||||
|
}
|
||||||
|
|
||||||
|
// Allocate memory for the output array
|
||||||
|
*infos = (GPUMemoryInfo*)malloc(deviceCount * sizeof(GPUMemoryInfo));
|
||||||
|
if (*infos == NULL) {
|
||||||
|
return 0; // Memory allocation failed
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < deviceCount; ++i) {
|
||||||
|
if (cudaSetDevice(i) != cudaSuccess) {
|
||||||
|
(*infos)[i].totalMemory = 0;
|
||||||
|
(*infos)[i].freeMemory = 0;
|
||||||
|
continue; // Skip if the device cannot be set
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t freeMem = 0, totalMem = 0;
|
||||||
|
if (cudaMemGetInfo(&freeMem, &totalMem) != cudaSuccess) {
|
||||||
|
(*infos)[i].totalMemory = 0;
|
||||||
|
(*infos)[i].freeMemory = 0;
|
||||||
|
continue; // Skip if memory info cannot be fetched
|
||||||
|
}
|
||||||
|
|
||||||
|
(*infos)[i].totalMemory = totalMem;
|
||||||
|
(*infos)[i].freeMemory = freeMem;
|
||||||
|
}
|
||||||
|
|
||||||
|
return deviceCount; // Return the number of devices processed
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
import "C"
|
||||||
|
import (
|
||||||
|
"github.com/cockroachdb/errors"
|
||||||
|
"unsafe"
|
||||||
|
)
|
||||||
|
|
||||||
|
// GPUMemoryInfo represents a single GPU's memory information.
|
||||||
|
type GPUMemoryInfo struct {
|
||||||
|
TotalMemory uint64 // Total memory in bytes
|
||||||
|
FreeMemory uint64 // Free memory in bytes
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetAllGPUMemoryInfo retrieves the memory information for all available GPUs.
|
||||||
|
// It returns a slice of GPUMemoryInfo and an error if no GPUs are found or retrieval fails.
|
||||||
|
func GetAllGPUMemoryInfo() ([]GPUMemoryInfo, error) {
|
||||||
|
var infos *C.GPUMemoryInfo
|
||||||
|
|
||||||
|
// Call the C function to retrieve GPU memory info
|
||||||
|
deviceCount := int(C.getAllGPUMemoryInfo(&infos))
|
||||||
|
if deviceCount == 0 {
|
||||||
|
return nil, errors.New("failed to retrieve GPU memory info or no GPUs found")
|
||||||
|
}
|
||||||
|
defer C.free(unsafe.Pointer(infos)) // Free the allocated memory
|
||||||
|
|
||||||
|
// Convert C array to Go slice
|
||||||
|
gpuInfos := make([]GPUMemoryInfo, 0, deviceCount)
|
||||||
|
infoArray := (*[1 << 30]C.GPUMemoryInfo)(unsafe.Pointer(infos))[:deviceCount:deviceCount]
|
||||||
|
|
||||||
|
for i := 0; i < deviceCount; i++ {
|
||||||
|
info := infoArray[i]
|
||||||
|
gpuInfos = append(gpuInfos, GPUMemoryInfo{
|
||||||
|
TotalMemory: uint64(info.totalMemory),
|
||||||
|
FreeMemory: uint64(info.freeMemory),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return gpuInfos, nil
|
||||||
|
}
|
|
@ -970,8 +970,9 @@ This helps Milvus-CDC synchronize incremental data`,
|
||||||
}
|
}
|
||||||
|
|
||||||
type gpuConfig struct {
|
type gpuConfig struct {
|
||||||
InitSize ParamItem `refreshable:"false"`
|
InitSize ParamItem `refreshable:"false"`
|
||||||
MaxSize ParamItem `refreshable:"false"`
|
MaxSize ParamItem `refreshable:"false"`
|
||||||
|
OverloadedMemoryThresholdPercentage ParamItem `refreshable:"false"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *gpuConfig) init(base *BaseTable) {
|
func (t *gpuConfig) init(base *BaseTable) {
|
||||||
|
@ -992,6 +993,16 @@ func (t *gpuConfig) init(base *BaseTable) {
|
||||||
DefaultValue: "4096",
|
DefaultValue: "4096",
|
||||||
}
|
}
|
||||||
t.MaxSize.Init(base.mgr)
|
t.MaxSize.Init(base.mgr)
|
||||||
|
t.OverloadedMemoryThresholdPercentage = ParamItem{
|
||||||
|
Key: "gpu.overloadedMemoryThresholdPercentage",
|
||||||
|
Version: "2.5.4",
|
||||||
|
Export: true,
|
||||||
|
DefaultValue: "95",
|
||||||
|
Formatter: func(v string) string {
|
||||||
|
return fmt.Sprintf("%f", getAsFloat(v)/100)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
t.OverloadedMemoryThresholdPercentage.Init(base.mgr)
|
||||||
}
|
}
|
||||||
|
|
||||||
type traceConfig struct {
|
type traceConfig struct {
|
||||||
|
|
Loading…
Reference in New Issue