fix: predict inverted index resource usage more reasonably (#31615) (#31641)

/kind improvement
issue: https://github.com/milvus-io/milvus/issues/31617
pr: #31615

Signed-off-by: longjiquan <jiquan.long@zilliz.com>
pull/31671/head
Jiquan Long 2024-03-27 21:07:11 +08:00 committed by GitHub
parent ad07289819
commit d37e1fdd9f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 45 additions and 10 deletions

View File

@ -27,7 +27,8 @@
bool
IsLoadWithDisk(const char* index_type, int index_engine_version) {
return knowhere::UseDiskLoad(index_type, index_engine_version);
return knowhere::UseDiskLoad(index_type, index_engine_version) ||
strcmp(index_type, milvus::index::INVERTED_INDEX_TYPE) == 0;
}
CStatus

View File

@ -42,6 +42,7 @@
#include "expr/ITypeExpr.h"
#include "plan/PlanNode.h"
#include "exec/expression/Expr.h"
#include "segcore/load_index_c.h"
namespace chrono = std::chrono;
@ -5272,4 +5273,8 @@ TEST(CApiTest, RANGE_SEARCH_WITH_RADIUS_AND_RANGE_FILTER_WHEN_IP_BFLOAT16) {
DeleteSearchResult(search_result);
DeleteCollection(c_collection);
DeleteSegment(segment);
}
TEST(CApiTest, IsLoadWithDisk) {
ASSERT_TRUE(IsLoadWithDisk(INVERTED_INDEX_TYPE, 0));
}

View File

@ -44,6 +44,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/contextutil"
"github.com/milvus-io/milvus/pkg/util/crypto"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metric"
"github.com/milvus-io/milvus/pkg/util/paramtable"
@ -65,10 +66,10 @@ const (
defaultMaxSearchRequest = 1024
// DefaultArithmeticIndexType name of default index type for scalar field
DefaultArithmeticIndexType = "INVERTED"
DefaultArithmeticIndexType = indexparamcheck.IndexINVERTED
// DefaultStringIndexType name of default index type for varChar/string field
DefaultStringIndexType = "INVERTED"
DefaultStringIndexType = indexparamcheck.IndexINVERTED
defaultRRFParamsValue = 60
maxRRFParamsValue = 16384

View File

@ -27,6 +27,7 @@ import (
"fmt"
"unsafe"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/conc"
@ -54,7 +55,7 @@ func NewIndexAttrCache() *IndexAttrCache {
}
}
func (c *IndexAttrCache) GetIndexResourceUsage(indexInfo *querypb.FieldIndexInfo, memoryIndexLoadPredictMemoryUsageFactor float64) (memory uint64, disk uint64, err error) {
func (c *IndexAttrCache) GetIndexResourceUsage(indexInfo *querypb.FieldIndexInfo, memoryIndexLoadPredictMemoryUsageFactor float64, fieldBinlog *datapb.FieldBinlog) (memory uint64, disk uint64, err error) {
indexType, err := funcutil.GetAttrByKeyFromRepeatedKV(common.IndexTypeKey, indexInfo.IndexParams)
if err != nil {
return 0, 0, fmt.Errorf("index type not exist in index params")
@ -64,6 +65,12 @@ func (c *IndexAttrCache) GetIndexResourceUsage(indexInfo *querypb.FieldIndexInfo
neededDiskSize := indexInfo.IndexSize - neededMemSize
return uint64(neededMemSize), uint64(neededDiskSize), nil
}
if indexType == indexparamcheck.IndexINVERTED {
neededMemSize := 0
// we will mmap the binlog if the index type is inverted index.
neededDiskSize := indexInfo.IndexSize + getBinlogDataSize(fieldBinlog)
return uint64(neededMemSize), uint64(neededDiskSize), nil
}
engineVersion := indexInfo.GetCurrentIndexVersion()
isLoadWithDisk, has := c.loadWithDisk.Get(typeutil.NewPair(indexType, engineVersion))

View File

@ -22,6 +22,7 @@ import (
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
@ -51,7 +52,7 @@ func (s *IndexAttrCacheSuite) TestCacheMissing() {
CurrentIndexVersion: 0,
}
_, _, err := s.c.GetIndexResourceUsage(info, paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat())
_, _, err := s.c.GetIndexResourceUsage(info, paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat(), nil)
s.Require().NoError(err)
_, has := s.c.loadWithDisk.Get(typeutil.NewPair[string, int32]("test", 0))
@ -67,7 +68,7 @@ func (s *IndexAttrCacheSuite) TestDiskANN() {
IndexSize: 100,
}
memory, disk, err := s.c.GetIndexResourceUsage(info, paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat())
memory, disk, err := s.c.GetIndexResourceUsage(info, paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat(), nil)
s.Require().NoError(err)
_, has := s.c.loadWithDisk.Get(typeutil.NewPair[string, int32](indexparamcheck.IndexDISKANN, 0))
@ -77,6 +78,26 @@ func (s *IndexAttrCacheSuite) TestDiskANN() {
s.EqualValues(75, disk)
}
func (s *IndexAttrCacheSuite) TestInvertedIndex() {
info := &querypb.FieldIndexInfo{
IndexParams: []*commonpb.KeyValuePair{
{Key: common.IndexTypeKey, Value: indexparamcheck.IndexINVERTED},
},
CurrentIndexVersion: 0,
IndexSize: 50,
}
binlog := &datapb.FieldBinlog{
Binlogs: []*datapb.Binlog{
{LogSize: 60},
},
}
memory, disk, err := s.c.GetIndexResourceUsage(info, paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat(), binlog)
s.Require().NoError(err)
s.EqualValues(uint64(0), memory)
s.EqualValues(uint64(110), disk)
}
func (s *IndexAttrCacheSuite) TestLoadWithDisk() {
info := &querypb.FieldIndexInfo{
IndexParams: []*commonpb.KeyValuePair{
@ -88,7 +109,7 @@ func (s *IndexAttrCacheSuite) TestLoadWithDisk() {
s.Run("load_with_disk", func() {
s.c.loadWithDisk.Insert(typeutil.NewPair[string, int32]("test", 0), true)
memory, disk, err := s.c.GetIndexResourceUsage(info, paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat())
memory, disk, err := s.c.GetIndexResourceUsage(info, paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat(), nil)
s.Require().NoError(err)
s.EqualValues(100, memory)
@ -97,7 +118,7 @@ func (s *IndexAttrCacheSuite) TestLoadWithDisk() {
s.Run("load_with_disk", func() {
s.c.loadWithDisk.Insert(typeutil.NewPair[string, int32]("test", 0), false)
memory, disk, err := s.c.GetIndexResourceUsage(info, paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat())
memory, disk, err := s.c.GetIndexResourceUsage(info, paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat(), nil)
s.Require().NoError(err)
s.Equal(uint64(250), memory)
@ -109,7 +130,7 @@ func (s *IndexAttrCacheSuite) TestLoadWithDisk() {
IndexParams: []*commonpb.KeyValuePair{},
}
_, _, err := s.c.GetIndexResourceUsage(info, paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat())
_, _, err := s.c.GetIndexResourceUsage(info, paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat(), nil)
s.Error(err)
})
}

View File

@ -1459,7 +1459,7 @@ func getResourceUsageEstimateOfSegment(schema *schemapb.CollectionSchema, loadIn
var mmapEnabled bool
if fieldIndexInfo, ok := vecFieldID2IndexInfo[fieldID]; ok {
mmapEnabled = isIndexMmapEnable(fieldIndexInfo)
neededMemSize, neededDiskSize, err := getIndexAttrCache().GetIndexResourceUsage(fieldIndexInfo, multiplyFactor.memoryIndexUsageFactor)
neededMemSize, neededDiskSize, err := getIndexAttrCache().GetIndexResourceUsage(fieldIndexInfo, multiplyFactor.memoryIndexUsageFactor, fieldBinlog)
if err != nil {
return nil, errors.Wrapf(err, "failed to get index size collection %d, segment %d, indexBuildID %d",
loadInfo.GetCollectionID(),