enhance: [2.5]Deprecate disk params about indexing (#41078)

issue: #40863 

master pr: #41045

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
pull/41131/head
cai.zhang 2025-04-07 11:36:34 +08:00 committed by GitHub
parent 23e579e324
commit 0db5e0c4f6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 11 additions and 253 deletions

View File

@ -510,8 +510,6 @@ indexCoord:
indexNode:
scheduler:
buildParallel: 1
enableDisk: true # enable index node build disk vector index
maxDiskUsagePercentage: 95
ip: # TCP/IP address of indexNode. If not specified, use the first unicastable address
port: 21121 # TCP port of indexNode
grpc:

View File

@ -30,7 +30,6 @@ import (
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/parser/planparserv2"
"github.com/milvus-io/milvus/internal/util/indexparamcheck"
"github.com/milvus-io/milvus/internal/util/vecindexmgr"
"github.com/milvus-io/milvus/pkg/v2/common"
pkgcommon "github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/log"
@ -320,13 +319,6 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques
}
}
if vecindexmgr.GetVecIndexMgrInstance().IsDiskANN(GetIndexType(req.IndexParams)) && !s.indexNodeManager.ClientSupportDisk() {
errMsg := "all IndexNodes do not support disk indexes, please verify"
log.Warn(errMsg)
metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc()
return merr.Status(merr.WrapErrIndexNotSupported(GetIndexType(req.IndexParams))), nil
}
allocatedIndexID, err := s.allocator.AllocID(ctx)
if err != nil {
log.Warn("failed to alloc indexID", zap.Error(err))

View File

@ -44,7 +44,6 @@ import (
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
"github.com/milvus-io/milvus/pkg/v2/proto/workerpb"
"github.com/milvus-io/milvus/pkg/v2/util/funcutil"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
@ -259,7 +258,7 @@ func TestServer_CreateIndex(t *testing.T) {
assert.Error(t, merr.CheckRPCCall(resp, err))
})
t.Run("not support disk index", func(t *testing.T) {
t.Run("disk index", func(t *testing.T) {
s.allocator = mock0Allocator
s.meta.indexMeta.indexes = map[UniqueID]map[UniqueID]*model.Index{}
req.IndexParams = []*commonpb.KeyValuePair{
@ -270,7 +269,7 @@ func TestServer_CreateIndex(t *testing.T) {
}
s.indexNodeManager = session.NewNodeManager(ctx, defaultIndexNodeCreatorFunc)
resp, err := s.CreateIndex(ctx, req)
assert.Error(t, merr.CheckRPCCall(resp, err))
assert.NoError(t, merr.CheckRPCCall(resp, err))
})
t.Run("disk index with mmap", func(t *testing.T) {
@ -290,10 +289,6 @@ func TestServer_CreateIndex(t *testing.T) {
s.indexNodeManager = nodeManager
mockNode := mocks.NewMockIndexNodeClient(t)
nodeManager.SetClient(1001, mockNode)
mockNode.EXPECT().GetJobStats(mock.Anything, mock.Anything).Return(&workerpb.GetJobStatsResponse{
Status: merr.Success(),
EnableDisk: true,
}, nil)
resp, err := s.CreateIndex(ctx, req)
assert.Error(t, merr.CheckRPCCall(resp, err))

View File

@ -153,7 +153,7 @@ type Server struct {
// indexCoord types.IndexCoord
// segReferManager *SegmentReferenceManager
indexNodeManager *session.IndexNodeManager
indexNodeManager session.WorkerManager
indexEngineVersionManager IndexEngineVersionManager
taskScheduler *taskScheduler

View File

@ -30,7 +30,6 @@ import (
"github.com/milvus-io/milvus/pkg/v2/metrics"
"github.com/milvus-io/milvus/pkg/v2/proto/workerpb"
"github.com/milvus-io/milvus/pkg/v2/util/lock"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
typeutil "github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
@ -45,7 +44,6 @@ type WorkerManager interface {
StoppingNode(nodeID typeutil.UniqueID)
PickClient() (typeutil.UniqueID, types.IndexNodeClient)
QuerySlots() map[typeutil.UniqueID]*WorkerSlots
ClientSupportDisk() bool
GetAllClients() map[typeutil.UniqueID]types.IndexNodeClient
GetClientByID(nodeID typeutil.UniqueID) (types.IndexNodeClient, bool)
}
@ -215,57 +213,6 @@ func (nm *IndexNodeManager) PickClient() (typeutil.UniqueID, types.IndexNodeClie
return 0, nil
}
func (nm *IndexNodeManager) ClientSupportDisk() bool {
log := log.Ctx(nm.ctx)
log.Debug("check if client support disk index")
allClients := nm.GetAllClients()
if len(allClients) == 0 {
log.Warn("there is no IndexNode online")
return false
}
// Note: In order to quickly end other goroutines, an error is returned when the client is successfully selected
ctx, cancel := context.WithCancel(nm.ctx)
var (
enableDisk = false
nodeMutex = lock.Mutex{}
wg = sync.WaitGroup{}
)
for nodeID, client := range allClients {
nodeID := nodeID
client := client
wg.Add(1)
go func() {
defer wg.Done()
resp, err := client.GetJobStats(ctx, &workerpb.GetJobStatsRequest{})
if err := merr.CheckRPCCall(resp, err); err != nil {
log.Warn("get IndexNode slots failed", zap.Int64("nodeID", nodeID), zap.Error(err))
return
}
log.Debug("get job stats success", zap.Int64("nodeID", nodeID), zap.Bool("enable disk", resp.GetEnableDisk()))
if resp.GetEnableDisk() {
nodeMutex.Lock()
defer nodeMutex.Unlock()
cancel()
if !enableDisk {
enableDisk = true
}
return
}
}()
}
wg.Wait()
cancel()
if enableDisk {
log.Info("IndexNode support disk index")
return true
}
log.Error("all IndexNodes do not support disk indexes")
return false
}
func (nm *IndexNodeManager) GetAllClients() map[typeutil.UniqueID]types.IndexNodeClient {
nm.lock.RLock()
defer nm.lock.RUnlock()

View File

@ -27,7 +27,6 @@ import (
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/v2/proto/workerpb"
"github.com/milvus-io/milvus/pkg/v2/util/lock"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
typeutil "github.com/milvus-io/milvus/pkg/v2/util/typeutil"
@ -100,95 +99,6 @@ func TestIndexNodeManager_PickClient(t *testing.T) {
})
}
func TestIndexNodeManager_ClientSupportDisk(t *testing.T) {
paramtable.Init()
getMockedGetJobStatsClient := func(resp *workerpb.GetJobStatsResponse, err error) types.IndexNodeClient {
ic := mocks.NewMockIndexNodeClient(t)
ic.EXPECT().GetJobStats(mock.Anything, mock.Anything, mock.Anything).Return(resp, err)
return ic
}
err := errors.New("error")
t.Run("support", func(t *testing.T) {
nm := &IndexNodeManager{
ctx: context.Background(),
lock: lock.RWMutex{},
nodeClients: map[typeutil.UniqueID]types.IndexNodeClient{
1: getMockedGetJobStatsClient(&workerpb.GetJobStatsResponse{
Status: merr.Success(),
AvailableSlots: 1,
JobInfos: nil,
EnableDisk: true,
}, nil),
},
}
support := nm.ClientSupportDisk()
assert.True(t, support)
})
t.Run("not support", func(t *testing.T) {
nm := &IndexNodeManager{
ctx: context.Background(),
lock: lock.RWMutex{},
nodeClients: map[typeutil.UniqueID]types.IndexNodeClient{
1: getMockedGetJobStatsClient(&workerpb.GetJobStatsResponse{
Status: merr.Success(),
AvailableSlots: 1,
JobInfos: nil,
EnableDisk: false,
}, nil),
},
}
support := nm.ClientSupportDisk()
assert.False(t, support)
})
t.Run("no indexnode", func(t *testing.T) {
nm := &IndexNodeManager{
ctx: context.Background(),
lock: lock.RWMutex{},
nodeClients: map[typeutil.UniqueID]types.IndexNodeClient{},
}
support := nm.ClientSupportDisk()
assert.False(t, support)
})
t.Run("error", func(t *testing.T) {
nm := &IndexNodeManager{
ctx: context.Background(),
lock: lock.RWMutex{},
nodeClients: map[typeutil.UniqueID]types.IndexNodeClient{
1: getMockedGetJobStatsClient(nil, err),
},
}
support := nm.ClientSupportDisk()
assert.False(t, support)
})
t.Run("fail reason", func(t *testing.T) {
nm := &IndexNodeManager{
ctx: context.Background(),
lock: lock.RWMutex{},
nodeClients: map[typeutil.UniqueID]types.IndexNodeClient{
1: getMockedGetJobStatsClient(&workerpb.GetJobStatsResponse{
Status: merr.Status(err),
AvailableSlots: 0,
JobInfos: nil,
EnableDisk: false,
}, nil),
},
}
support := nm.ClientSupportDisk()
assert.False(t, support)
})
}
func TestNodeManager_StoppingNode(t *testing.T) {
paramtable.Init()
nm := NewNodeManager(context.Background(), defaultIndexNodeCreatorFunc)

View File

@ -206,7 +206,7 @@ func (i *IndexNode) CloseSegcore() {
}
func (i *IndexNode) initSession() error {
i.session = sessionutil.NewSession(i.loopCtx, sessionutil.WithEnableDisk(Params.IndexNodeCfg.EnableDisk.GetAsBool()))
i.session = sessionutil.NewSession(i.loopCtx)
if i.session == nil {
return errors.New("failed to initialize session")
}

View File

@ -218,7 +218,6 @@ func (i *IndexNode) GetJobStats(ctx context.Context, req *workerpb.GetJobStatsRe
EnqueueJobNum: int64(unissued),
TotalSlots: i.totalSlot,
AvailableSlots: slots,
EnableDisk: Params.IndexNodeCfg.EnableDisk.GetAsBool(),
}, nil
}

View File

@ -23,7 +23,6 @@ import (
"strings"
"time"
"github.com/cockroachdb/errors"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
@ -219,35 +218,13 @@ func (it *indexBuildTask) Execute(ctx context.Context) error {
indexType := it.newIndexParams[common.IndexTypeKey]
var fieldDataSize uint64
var err error
if vecindexmgr.GetVecIndexMgrInstance().IsDiskANN(indexType) {
// check index node support disk index
if !Params.IndexNodeCfg.EnableDisk.GetAsBool() {
log.Warn("IndexNode don't support build disk index",
zap.String("index type", it.newIndexParams[common.IndexTypeKey]),
zap.Bool("enable disk", Params.IndexNodeCfg.EnableDisk.GetAsBool()))
return errors.New("index node don't support build disk index")
}
// check load size and size of field data
localUsedSize, err := indexcgowrapper.GetLocalUsedSize(paramtable.Get().LocalStorageCfg.Path.GetValue())
if err != nil {
log.Warn("IndexNode get local used size failed")
return err
}
fieldDataSize, err = estimateFieldDataSize(it.req.GetDim(), it.req.GetNumRows(), it.req.GetField().GetDataType())
if err != nil {
log.Warn("IndexNode get local used size failed")
return err
}
usedLocalSizeWhenBuild := int64(float64(fieldDataSize)*diskUsageRatio) + localUsedSize
maxUsedLocalSize := int64(Params.IndexNodeCfg.DiskCapacityLimit.GetAsFloat() * Params.IndexNodeCfg.MaxDiskUsagePercentage.GetAsFloat())
if usedLocalSizeWhenBuild > maxUsedLocalSize {
log.Warn("IndexNode don't has enough disk size to build disk ann index",
zap.Int64("usedLocalSizeWhenBuild", usedLocalSizeWhenBuild),
zap.Int64("maxUsedLocalSize", maxUsedLocalSize))
return errors.New("index node don't has enough disk size to build disk ann index")
}
err = indexparams.SetDiskIndexBuildParams(it.newIndexParams, int64(fieldDataSize))
if err != nil {
@ -313,8 +290,7 @@ func (it *indexBuildTask) Execute(ctx context.Context) error {
PartitionKeyIsolation: it.req.GetPartitionKeyIsolation(),
}
log.Info("debug create index", zap.Any("buildIndexParams", buildIndexParams))
var err error
log.Info("create index", zap.Any("buildIndexParams", buildIndexParams))
it.index, err = indexcgowrapper.CreateIndex(ctx, buildIndexParams)
if err != nil {
if it.index != nil && it.index.CleanLocalData() != nil {

View File

@ -103,7 +103,6 @@ type SessionRaw struct {
LeaseID *clientv3.LeaseID `json:"LeaseID,omitempty"`
HostName string `json:"HostName,omitempty"`
EnableDisk bool `json:"EnableDisk,omitempty"`
ServerLabels map[string]string `json:"ServerLabels,omitempty"`
}
@ -192,12 +191,6 @@ func WithScalarIndexEngineVersion(minimal, current int32) SessionOption {
}
}
func WithEnableDisk(enableDisk bool) SessionOption {
return func(s *Session) {
s.EnableDisk = enableDisk
}
}
func (s *Session) apply(opts ...SessionOption) {
for _, opt := range opts {
opt(s)

View File

@ -101,6 +101,7 @@ message GetJobStatsResponse {
int64 enqueue_job_num = 4;
int64 available_slots = 5;
repeated index.JobInfo job_infos = 6;
// deprecated
bool enable_disk = 7;
int64 total_slots = 8;
}

View File

@ -511,8 +511,9 @@ type GetJobStatsResponse struct {
EnqueueJobNum int64 `protobuf:"varint,4,opt,name=enqueue_job_num,json=enqueueJobNum,proto3" json:"enqueue_job_num,omitempty"`
AvailableSlots int64 `protobuf:"varint,5,opt,name=available_slots,json=availableSlots,proto3" json:"available_slots,omitempty"`
JobInfos []*indexpb.JobInfo `protobuf:"bytes,6,rep,name=job_infos,json=jobInfos,proto3" json:"job_infos,omitempty"`
EnableDisk bool `protobuf:"varint,7,opt,name=enable_disk,json=enableDisk,proto3" json:"enable_disk,omitempty"`
TotalSlots int64 `protobuf:"varint,8,opt,name=total_slots,json=totalSlots,proto3" json:"total_slots,omitempty"`
// deprecated
EnableDisk bool `protobuf:"varint,7,opt,name=enable_disk,json=enableDisk,proto3" json:"enable_disk,omitempty"`
TotalSlots int64 `protobuf:"varint,8,opt,name=total_slots,json=totalSlots,proto3" json:"total_slots,omitempty"`
}
func (x *GetJobStatsResponse) Reset() {

View File

@ -5077,12 +5077,7 @@ if this parameter <= 0, will set it as 10`,
// /////////////////////////////////////////////////////////////////////////////
// --- indexnode ---
type indexNodeConfig struct {
BuildParallel ParamItem `refreshable:"false"`
// enable disk
EnableDisk ParamItem `refreshable:"false"`
DiskCapacityLimit ParamItem `refreshable:"true"`
MaxDiskUsagePercentage ParamItem `refreshable:"true"`
BuildParallel ParamItem `refreshable:"false"`
GracefulStopTimeout ParamItem `refreshable:"true"`
WorkerSlotUnit ParamItem `refreshable:"true"`
@ -5097,52 +5092,6 @@ func (p *indexNodeConfig) init(base *BaseTable) {
}
p.BuildParallel.Init(base.mgr)
p.EnableDisk = ParamItem{
Key: "indexNode.enableDisk",
Version: "2.2.0",
DefaultValue: "false",
PanicIfEmpty: true,
Doc: "enable index node build disk vector index",
Export: true,
}
p.EnableDisk.Init(base.mgr)
p.DiskCapacityLimit = ParamItem{
Key: "LOCAL_STORAGE_SIZE",
Version: "2.2.0",
Formatter: func(v string) string {
if len(v) == 0 {
// use local storage path to check correct device
localStoragePath := base.Get("localStorage.path")
if _, err := os.Stat(localStoragePath); os.IsNotExist(err) {
if err := os.MkdirAll(localStoragePath, os.ModePerm); err != nil {
log.Fatal("failed to mkdir", zap.String("localStoragePath", localStoragePath), zap.Error(err))
}
}
diskUsage, err := disk.Usage(localStoragePath)
if err != nil {
log.Fatal("failed to get disk usage", zap.String("localStoragePath", localStoragePath), zap.Error(err))
}
return strconv.FormatUint(diskUsage.Total, 10)
}
diskSize := getAsInt64(v)
return strconv.FormatInt(diskSize*1024*1024*1024, 10)
},
}
p.DiskCapacityLimit.Init(base.mgr)
p.MaxDiskUsagePercentage = ParamItem{
Key: "indexNode.maxDiskUsagePercentage",
Version: "2.2.0",
DefaultValue: "95",
PanicIfEmpty: true,
Formatter: func(v string) string {
return fmt.Sprintf("%f", getAsFloat(v)/100)
},
Export: true,
}
p.MaxDiskUsagePercentage.Init(base.mgr)
p.GracefulStopTimeout = ParamItem{
Key: "indexNode.gracefulStopTimeout",
Version: "2.2.1",

View File

@ -674,9 +674,6 @@ func TestCachedParam(t *testing.T) {
Init()
params := Get()
assert.True(t, params.IndexNodeCfg.EnableDisk.GetAsBool())
assert.True(t, params.IndexNodeCfg.EnableDisk.GetAsBool())
assert.Equal(t, 256*1024*1024, params.QueryCoordGrpcServerCfg.ServerMaxRecvSize.GetAsInt())
assert.Equal(t, 256*1024*1024, params.QueryCoordGrpcServerCfg.ServerMaxRecvSize.GetAsInt())