mirror of https://github.com/milvus-io/milvus.git
enhance: provide more general configuration to control mmap behavior (#35359)
- issue: #35273 Signed-off-by: SimFG <bang.fu@zilliz.com>pull/35605/head
parent
99f8c64c31
commit
731d45abbe
|
@ -84,7 +84,7 @@ func (m *MmapMigration) MigrateIndexCoordCollection(ctx context.Context) {
|
|||
|
||||
alteredIndexes := make([]*model.Index, 0)
|
||||
for _, index := range fieldIndexes {
|
||||
if !indexparamcheck.IsMmapSupported(getIndexType(index.IndexParams)) {
|
||||
if !indexparamcheck.IsVectorMmapIndex(getIndexType(index.IndexParams)) {
|
||||
continue
|
||||
}
|
||||
fmt.Printf("migrate index, collection:%v, indexId: %v, indexName: %s\n", index.CollectionID, index.IndexID, index.IndexName)
|
||||
|
|
|
@ -398,7 +398,6 @@ queryNode:
|
|||
enableDisk: false # enable querynode load disk index, and search on disk index
|
||||
maxDiskUsagePercentage: 95
|
||||
cache:
|
||||
enabled: true
|
||||
memoryLimit: 2147483648 # 2 GB, 2 * 1024 *1024 *1024
|
||||
readAheadPolicy: willneed # The read ahead policy of chunk cache, options: `normal, random, sequential, willneed, dontneed`
|
||||
# options: async, sync, disable.
|
||||
|
@ -409,7 +408,10 @@ queryNode:
|
|||
# 2. If set to "disable" original vector data will only be loaded into the chunk cache during search/query.
|
||||
warmup: disable
|
||||
mmap:
|
||||
mmapEnabled: false # Enable mmap for loading data
|
||||
vectorField: false # Enable mmap for loading vector data
|
||||
vectorIndex: false # Enable mmap for loading vector index
|
||||
scalarField: false # Enable mmap for loading scalar data
|
||||
scalarIndex: false # Enable mmap for loading scalar index
|
||||
growingMmapEnabled: false # Enable mmap for using in growing raw data
|
||||
fixedFileSizeForMmapAlloc: 1 # tmp file size for mmap chunk manager
|
||||
maxDiskUsagePercentageForMmapAlloc: 50 # disk percentage used in mmap chunk manager
|
||||
|
|
2
go.mod
2
go.mod
|
@ -17,7 +17,7 @@ require (
|
|||
github.com/gin-gonic/gin v1.9.1
|
||||
github.com/go-playground/validator/v10 v10.14.0
|
||||
github.com/gofrs/flock v0.8.1
|
||||
github.com/golang/protobuf v1.5.4 // indirect
|
||||
github.com/golang/protobuf v1.5.4
|
||||
github.com/google/btree v1.1.2
|
||||
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
|
||||
github.com/klauspost/compress v1.17.7
|
||||
|
|
|
@ -19,7 +19,6 @@ package datacoord
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/samber/lo"
|
||||
|
@ -28,9 +27,9 @@ import (
|
|||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/metastore/model"
|
||||
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/metautil"
|
||||
|
@ -271,22 +270,15 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques
|
|||
}
|
||||
|
||||
func ValidateIndexParams(index *model.Index) error {
|
||||
for _, paramSet := range [][]*commonpb.KeyValuePair{index.IndexParams, index.UserIndexParams} {
|
||||
for _, param := range paramSet {
|
||||
switch param.GetKey() {
|
||||
case common.MmapEnabledKey:
|
||||
indexType := GetIndexType(index.IndexParams)
|
||||
if !indexparamcheck.IsMmapSupported(indexType) {
|
||||
return merr.WrapErrParameterInvalidMsg("index type %s does not support mmap", indexType)
|
||||
}
|
||||
|
||||
if _, err := strconv.ParseBool(param.GetValue()); err != nil {
|
||||
return merr.WrapErrParameterInvalidMsg("invalid %s value: %s, expected: true, false", param.GetKey(), param.GetValue())
|
||||
}
|
||||
}
|
||||
}
|
||||
indexType := GetIndexType(index.IndexParams)
|
||||
indexParams := funcutil.KeyValuePair2Map(index.IndexParams)
|
||||
if err := indexparamcheck.ValidateMmapIndexParams(indexType, indexParams); err != nil {
|
||||
return merr.WrapErrParameterInvalidMsg("invalid mmap index params", err.Error())
|
||||
}
|
||||
userIndexParams := funcutil.KeyValuePair2Map(index.UserIndexParams)
|
||||
if err := indexparamcheck.ValidateMmapIndexParams(indexType, userIndexParams); err != nil {
|
||||
return merr.WrapErrParameterInvalidMsg("invalid mmap user index params", err.Error())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -639,7 +639,9 @@ func TestServer_AlterIndex(t *testing.T) {
|
|||
Timestamp: createTS,
|
||||
})
|
||||
assert.NoError(t, merr.CheckRPCCall(describeResp, err))
|
||||
assert.True(t, common.IsMmapEnabled(describeResp.IndexInfos[0].GetUserIndexParams()...), "indexInfo: %+v", describeResp.IndexInfos[0])
|
||||
enableMmap, ok := common.IsMmapDataEnabled(describeResp.IndexInfos[0].GetUserIndexParams()...)
|
||||
assert.True(t, enableMmap, "indexInfo: %+v", describeResp.IndexInfos[0])
|
||||
assert.True(t, ok)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -2397,3 +2399,58 @@ func TestMeta_GetHasUnindexTaskSegments(t *testing.T) {
|
|||
assert.Equal(t, 0, len(segments))
|
||||
})
|
||||
}
|
||||
|
||||
func TestValidateIndexParams(t *testing.T) {
|
||||
t.Run("valid", func(t *testing.T) {
|
||||
index := &model.Index{
|
||||
IndexParams: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: common.IndexTypeKey,
|
||||
Value: indexparamcheck.AutoIndex,
|
||||
},
|
||||
{
|
||||
Key: common.MmapEnabledKey,
|
||||
Value: "true",
|
||||
},
|
||||
},
|
||||
}
|
||||
err := ValidateIndexParams(index)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("invalid index param", func(t *testing.T) {
|
||||
index := &model.Index{
|
||||
IndexParams: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: common.IndexTypeKey,
|
||||
Value: indexparamcheck.AutoIndex,
|
||||
},
|
||||
{
|
||||
Key: common.MmapEnabledKey,
|
||||
Value: "h",
|
||||
},
|
||||
},
|
||||
}
|
||||
err := ValidateIndexParams(index)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("invalid index user param", func(t *testing.T) {
|
||||
index := &model.Index{
|
||||
IndexParams: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: common.IndexTypeKey,
|
||||
Value: indexparamcheck.AutoIndex,
|
||||
},
|
||||
},
|
||||
UserIndexParams: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: common.MmapEnabledKey,
|
||||
Value: "h",
|
||||
},
|
||||
},
|
||||
}
|
||||
err := ValidateIndexParams(index)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -1229,7 +1229,8 @@ func TestProxy(t *testing.T) {
|
|||
err = merr.CheckRPCCall(resp, err)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, floatIndexName, resp.IndexDescriptions[0].IndexName)
|
||||
assert.True(t, common.IsMmapEnabled(resp.IndexDescriptions[0].GetParams()...), "params: %+v", resp.IndexDescriptions[0])
|
||||
enableMmap, _ := common.IsMmapDataEnabled(resp.IndexDescriptions[0].GetParams()...)
|
||||
assert.True(t, enableMmap, "params: %+v", resp.IndexDescriptions[0])
|
||||
|
||||
// disable mmap then the tests below could continue
|
||||
req := &milvuspb.AlterIndexRequest{
|
||||
|
|
|
@ -38,6 +38,7 @@ import (
|
|||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
||||
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
|
@ -360,7 +361,8 @@ func (t *createCollectionTask) PreExecute(ctx context.Context) error {
|
|||
return err
|
||||
}
|
||||
// validate dense vector field type parameters
|
||||
if typeutil.IsVectorType(field.DataType) {
|
||||
isVectorType := typeutil.IsVectorType(field.DataType)
|
||||
if isVectorType {
|
||||
err = validateDimension(field)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -382,6 +384,11 @@ func (t *createCollectionTask) PreExecute(ctx context.Context) error {
|
|||
return err
|
||||
}
|
||||
}
|
||||
// TODO should remove the index params in the field schema
|
||||
indexParams := funcutil.KeyValuePair2Map(field.GetIndexParams())
|
||||
if err = ValidateAutoIndexMmapConfig(isVectorType, indexParams); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := validateMultipleVectorFields(t.schema); err != nil {
|
||||
|
|
|
@ -149,8 +149,16 @@ func (cit *createIndexTask) parseIndexParams() error {
|
|||
}
|
||||
}
|
||||
|
||||
if err := ValidateAutoIndexMmapConfig(isVecIndex, indexParamsMap); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
specifyIndexType, exist := indexParamsMap[common.IndexTypeKey]
|
||||
if exist && specifyIndexType != "" {
|
||||
if err := indexparamcheck.ValidateMmapIndexParams(specifyIndexType, indexParamsMap); err != nil {
|
||||
log.Ctx(cit.ctx).Warn("Invalid mmap type params", zap.String(common.IndexTypeKey, specifyIndexType), zap.Error(err))
|
||||
return merr.WrapErrParameterInvalidMsg("invalid mmap type params", err.Error())
|
||||
}
|
||||
checker, err := indexparamcheck.GetIndexCheckerMgrInstance().GetChecker(specifyIndexType)
|
||||
// not enable hybrid index for user, used in milvus internally
|
||||
if err != nil || indexparamcheck.IsHYBRIDChecker(checker) {
|
||||
|
@ -566,6 +574,12 @@ func (t *alterIndexTask) PreExecute(ctx context.Context) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// TODO fubang should implement it when the alter index is reconstructed
|
||||
// typeParams := funcutil.KeyValuePair2Map(t.req.GetExtraParams())
|
||||
// if err = ValidateAutoIndexMmapConfig(typeParams); err != nil {
|
||||
// return err
|
||||
// }
|
||||
|
||||
loaded, err := isCollectionLoaded(ctx, t.querycoord, collection)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -36,7 +36,6 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/config"
|
||||
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
|
@ -1026,6 +1025,38 @@ func Test_parseIndexParams(t *testing.T) {
|
|||
err := cit.parseIndexParams()
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("create auto index and mmap enable", func(t *testing.T) {
|
||||
paramtable.Init()
|
||||
Params.Save(Params.AutoIndexConfig.Enable.Key, "true")
|
||||
defer Params.Reset(Params.AutoIndexConfig.Enable.Key)
|
||||
|
||||
cit := &createIndexTask{
|
||||
Condition: nil,
|
||||
req: &milvuspb.CreateIndexRequest{
|
||||
ExtraParams: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: common.IndexTypeKey,
|
||||
Value: AutoIndexName,
|
||||
},
|
||||
{
|
||||
Key: common.MmapEnabledKey,
|
||||
Value: "true",
|
||||
},
|
||||
},
|
||||
IndexName: "",
|
||||
},
|
||||
fieldSchema: &schemapb.FieldSchema{
|
||||
FieldID: 101,
|
||||
Name: "FieldVector",
|
||||
IsPrimaryKey: false,
|
||||
DataType: schemapb.DataType_FloatVector,
|
||||
},
|
||||
}
|
||||
|
||||
err := cit.parseIndexParams()
|
||||
assert.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func Test_wrapUserIndexParams(t *testing.T) {
|
||||
|
@ -1039,16 +1070,15 @@ func Test_wrapUserIndexParams(t *testing.T) {
|
|||
|
||||
func Test_parseIndexParams_AutoIndex_WithType(t *testing.T) {
|
||||
paramtable.Init()
|
||||
mgr := config.NewManager()
|
||||
mgr.SetConfig("autoIndex.enable", "true")
|
||||
Params.AutoIndexConfig.Enable.Init(mgr)
|
||||
Params.Save(Params.AutoIndexConfig.Enable.Key, "true")
|
||||
Params.Save(Params.AutoIndexConfig.IndexParams.Key, `{"M": 30,"efConstruction": 360,"index_type": "HNSW"}`)
|
||||
Params.Save(Params.AutoIndexConfig.SparseIndexParams.Key, `{"drop_ratio_build": 0.2, "index_type": "SPARSE_INVERTED_INDEX"}`)
|
||||
Params.Save(Params.AutoIndexConfig.BinaryIndexParams.Key, `{"nlist": 1024, "index_type": "BIN_IVF_FLAT"}`)
|
||||
|
||||
mgr.SetConfig("autoIndex.params.build", `{"M": 30,"efConstruction": 360,"index_type": "HNSW"}`)
|
||||
mgr.SetConfig("autoIndex.params.sparse.build", `{"drop_ratio_build": 0.2, "index_type": "SPARSE_INVERTED_INDEX"}`)
|
||||
mgr.SetConfig("autoIndex.params.binary.build", `{"nlist": 1024, "index_type": "BIN_IVF_FLAT"}`)
|
||||
Params.AutoIndexConfig.IndexParams.Init(mgr)
|
||||
Params.AutoIndexConfig.SparseIndexParams.Init(mgr)
|
||||
Params.AutoIndexConfig.BinaryIndexParams.Init(mgr)
|
||||
defer Params.Reset(Params.AutoIndexConfig.Enable.Key)
|
||||
defer Params.Reset(Params.AutoIndexConfig.IndexParams.Key)
|
||||
defer Params.Reset(Params.AutoIndexConfig.SparseIndexParams.Key)
|
||||
defer Params.Reset(Params.AutoIndexConfig.BinaryIndexParams.Key)
|
||||
|
||||
floatFieldSchema := &schemapb.FieldSchema{
|
||||
DataType: schemapb.DataType_FloatVector,
|
||||
|
@ -1090,7 +1120,6 @@ func Test_parseIndexParams_AutoIndex_WithType(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("case 2, sparse vector parameters", func(t *testing.T) {
|
||||
Params.AutoIndexConfig.IndexParams.Init(mgr)
|
||||
task := &createIndexTask{
|
||||
fieldSchema: sparseFloatFieldSchema,
|
||||
req: &milvuspb.CreateIndexRequest{
|
||||
|
@ -1131,15 +1160,16 @@ func Test_parseIndexParams_AutoIndex_WithType(t *testing.T) {
|
|||
|
||||
func Test_parseIndexParams_AutoIndex(t *testing.T) {
|
||||
paramtable.Init()
|
||||
mgr := config.NewManager()
|
||||
mgr.SetConfig("autoIndex.enable", "false")
|
||||
mgr.SetConfig("autoIndex.params.build", `{"M": 30,"efConstruction": 360,"index_type": "HNSW", "metric_type": "IP"}`)
|
||||
mgr.SetConfig("autoIndex.params.binary.build", `{"nlist": 1024, "index_type": "BIN_IVF_FLAT", "metric_type": "JACCARD"}`)
|
||||
mgr.SetConfig("autoIndex.params.sparse.build", `{"index_type": "SPARSE_INVERTED_INDEX", "metric_type": "IP"}`)
|
||||
Params.AutoIndexConfig.Enable.Init(mgr)
|
||||
Params.AutoIndexConfig.IndexParams.Init(mgr)
|
||||
Params.AutoIndexConfig.BinaryIndexParams.Init(mgr)
|
||||
Params.AutoIndexConfig.SparseIndexParams.Init(mgr)
|
||||
|
||||
Params.Save(Params.AutoIndexConfig.Enable.Key, "false")
|
||||
Params.Save(Params.AutoIndexConfig.IndexParams.Key, `{"M": 30,"efConstruction": 360,"index_type": "HNSW", "metric_type": "IP"}`)
|
||||
Params.Save(Params.AutoIndexConfig.BinaryIndexParams.Key, `{"nlist": 1024, "index_type": "BIN_IVF_FLAT", "metric_type": "JACCARD"}`)
|
||||
Params.Save(Params.AutoIndexConfig.SparseIndexParams.Key, `{"index_type": "SPARSE_INVERTED_INDEX", "metric_type": "IP"}`)
|
||||
defer Params.Reset(Params.AutoIndexConfig.Enable.Key)
|
||||
defer Params.Reset(Params.AutoIndexConfig.IndexParams.Key)
|
||||
defer Params.Reset(Params.AutoIndexConfig.BinaryIndexParams.Key)
|
||||
defer Params.Reset(Params.AutoIndexConfig.SparseIndexParams.Key)
|
||||
|
||||
autoIndexConfig := Params.AutoIndexConfig.IndexParams.GetAsJSONMap()
|
||||
autoIndexConfigBinary := Params.AutoIndexConfig.BinaryIndexParams.GetAsJSONMap()
|
||||
autoIndexConfigSparse := Params.AutoIndexConfig.SparseIndexParams.GetAsJSONMap()
|
||||
|
|
|
@ -1801,6 +1801,61 @@ func TestTask_Int64PrimaryKey(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestIndexType(t *testing.T) {
|
||||
rc := NewRootCoordMock()
|
||||
defer rc.Close()
|
||||
|
||||
ctx := context.Background()
|
||||
shardsNum := int32(2)
|
||||
prefix := "TestTask_all"
|
||||
dbName := ""
|
||||
collectionName := prefix + funcutil.GenRandomStr()
|
||||
|
||||
fieldName2Types := map[string]schemapb.DataType{
|
||||
testBoolField: schemapb.DataType_Bool,
|
||||
testInt32Field: schemapb.DataType_Int32,
|
||||
testInt64Field: schemapb.DataType_Int64,
|
||||
testFloatField: schemapb.DataType_Float,
|
||||
testDoubleField: schemapb.DataType_Double,
|
||||
testFloatVecField: schemapb.DataType_FloatVector,
|
||||
}
|
||||
|
||||
t.Run("invalid type param", func(t *testing.T) {
|
||||
paramtable.Init()
|
||||
Params.Save(Params.AutoIndexConfig.Enable.Key, "true")
|
||||
defer Params.Reset(Params.AutoIndexConfig.Enable.Key)
|
||||
|
||||
schema := constructCollectionSchemaByDataType(collectionName, fieldName2Types, testInt64Field, false)
|
||||
for _, field := range schema.Fields {
|
||||
dataType := field.GetDataType()
|
||||
if typeutil.IsVectorType(dataType) {
|
||||
field.IndexParams = append(field.IndexParams, &commonpb.KeyValuePair{
|
||||
Key: common.MmapEnabledKey,
|
||||
Value: "true",
|
||||
})
|
||||
break
|
||||
}
|
||||
}
|
||||
marshaledSchema, err := proto.Marshal(schema)
|
||||
assert.NoError(t, err)
|
||||
|
||||
createColT := &createCollectionTask{
|
||||
Condition: NewTaskCondition(ctx),
|
||||
CreateCollectionRequest: &milvuspb.CreateCollectionRequest{
|
||||
Base: nil,
|
||||
DbName: dbName,
|
||||
CollectionName: collectionName,
|
||||
Schema: marshaledSchema,
|
||||
ShardsNum: shardsNum,
|
||||
},
|
||||
ctx: ctx,
|
||||
rootCoord: rc,
|
||||
}
|
||||
assert.NoError(t, createColT.OnEnqueue())
|
||||
assert.Error(t, createColT.PreExecute(ctx))
|
||||
})
|
||||
}
|
||||
|
||||
func TestTask_VarCharPrimaryKey(t *testing.T) {
|
||||
var err error
|
||||
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
|
@ -932,3 +933,15 @@ func newValidateUtil(opts ...validateOption) *validateUtil {
|
|||
|
||||
return v
|
||||
}
|
||||
|
||||
func ValidateAutoIndexMmapConfig(isVectorField bool, indexParams map[string]string) error {
|
||||
if !Params.AutoIndexConfig.Enable.GetAsBool() {
|
||||
return nil
|
||||
}
|
||||
|
||||
_, ok := indexParams[common.MmapEnabledKey]
|
||||
if ok && isVectorField {
|
||||
return fmt.Errorf("mmap index is not supported to config for the collection in auto index mode")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ package task
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/samber/lo"
|
||||
|
@ -135,12 +136,12 @@ func packLoadSegmentRequest(
|
|||
loadScope = querypb.LoadScope_Delta
|
||||
}
|
||||
// field mmap enabled if collection-level mmap enabled or the field mmap enabled
|
||||
collectionMmapEnabled := common.IsMmapEnabled(collectionProperties...)
|
||||
collectionMmapEnabled, exist := common.IsMmapDataEnabled(collectionProperties...)
|
||||
for _, field := range schema.GetFields() {
|
||||
if collectionMmapEnabled {
|
||||
if exist {
|
||||
field.TypeParams = append(field.TypeParams, &commonpb.KeyValuePair{
|
||||
Key: common.MmapEnabledKey,
|
||||
Value: "true",
|
||||
Value: strconv.FormatBool(collectionMmapEnabled),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -83,7 +83,9 @@ func (s *UtilsSuite) TestPackLoadSegmentRequest() {
|
|||
s.Equal(task.ReplicaID(), req.ReplicaID)
|
||||
s.Equal(action.Node(), req.GetDstNodeID())
|
||||
for _, field := range req.GetSchema().GetFields() {
|
||||
s.False(common.IsMmapEnabled(field.GetTypeParams()...))
|
||||
mmapEnable, ok := common.IsMmapDataEnabled(field.GetTypeParams()...)
|
||||
s.False(mmapEnable)
|
||||
s.True(ok)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -136,7 +138,9 @@ func (s *UtilsSuite) TestPackLoadSegmentRequestMmap() {
|
|||
s.Equal(task.ReplicaID(), req.ReplicaID)
|
||||
s.Equal(action.Node(), req.GetDstNodeID())
|
||||
for _, field := range req.GetSchema().GetFields() {
|
||||
s.True(common.IsMmapEnabled(field.GetTypeParams()...))
|
||||
mmapEnable, ok := common.IsMmapDataEnabled(field.GetTypeParams()...)
|
||||
s.True(mmapEnable)
|
||||
s.True(ok)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -29,20 +29,10 @@ import (
|
|||
"runtime"
|
||||
"unsafe"
|
||||
|
||||
"github.com/pingcap/log"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/datacoord"
|
||||
"github.com/milvus-io/milvus/internal/proto/cgopb"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/params"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
|
||||
"github.com/milvus-io/milvus/pkg/util/indexparams"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
)
|
||||
|
||||
// LoadIndexInfo is a wrapper of the underlying C-structure C.CLoadIndexInfo
|
||||
|
@ -73,67 +63,6 @@ func deleteLoadIndexInfo(info *LoadIndexInfo) {
|
|||
}).Await()
|
||||
}
|
||||
|
||||
func isIndexMmapEnable(indexInfo *querypb.FieldIndexInfo) bool {
|
||||
enableMmap := common.IsMmapEnabled(indexInfo.IndexParams...)
|
||||
if !enableMmap {
|
||||
_, ok := funcutil.KeyValuePair2Map(indexInfo.IndexParams)[common.MmapEnabledKey]
|
||||
indexType := datacoord.GetIndexType(indexInfo.IndexParams)
|
||||
indexSupportMmap := indexparamcheck.IsMmapSupported(indexType)
|
||||
enableMmap = !ok && params.Params.QueryNodeCfg.MmapEnabled.GetAsBool() && indexSupportMmap
|
||||
}
|
||||
return enableMmap
|
||||
}
|
||||
|
||||
func (li *LoadIndexInfo) appendLoadIndexInfo(ctx context.Context, indexInfo *querypb.FieldIndexInfo, collectionID int64, partitionID int64, segmentID int64, fieldType schemapb.DataType) error {
|
||||
fieldID := indexInfo.FieldID
|
||||
indexPaths := indexInfo.IndexFilePaths
|
||||
|
||||
indexParams := funcutil.KeyValuePair2Map(indexInfo.IndexParams)
|
||||
|
||||
enableMmap := isIndexMmapEnable(indexInfo)
|
||||
// as Knowhere reports error if encounter a unknown param, we need to delete it
|
||||
delete(indexParams, common.MmapEnabledKey)
|
||||
|
||||
mmapDirPath := paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue()
|
||||
err := li.appendFieldInfo(ctx, collectionID, partitionID, segmentID, fieldID, fieldType, enableMmap, mmapDirPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = li.appendIndexInfo(ctx, indexInfo.IndexID, indexInfo.BuildID, indexInfo.IndexVersion)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// some build params also exist in indexParams, which are useless during loading process
|
||||
if indexParams["index_type"] == indexparamcheck.IndexDISKANN {
|
||||
err = indexparams.SetDiskIndexLoadParams(paramtable.Get(), indexParams, indexInfo.GetNumRows())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
err = indexparams.AppendPrepareLoadParams(paramtable.Get(), indexParams)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Info("load with index params", zap.Any("indexParams", indexParams))
|
||||
for key, value := range indexParams {
|
||||
err = li.appendIndexParam(ctx, key, value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := li.appendIndexEngineVersion(ctx, indexInfo.GetCurrentIndexVersion()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = li.appendIndexData(ctx, indexPaths)
|
||||
return err
|
||||
}
|
||||
|
||||
// appendIndexParam append indexParam to index
|
||||
func (li *LoadIndexInfo) appendIndexParam(ctx context.Context, indexKey string, indexValue string) error {
|
||||
var status C.CStatus
|
||||
|
|
|
@ -102,7 +102,7 @@ func (suite *ReduceSuite) SetupTest() {
|
|||
)
|
||||
suite.Require().NoError(err)
|
||||
for _, binlog := range binlogs {
|
||||
err = suite.segment.(*LocalSegment).LoadFieldData(ctx, binlog.FieldID, int64(msgLength), binlog, false)
|
||||
err = suite.segment.(*LocalSegment).LoadFieldData(ctx, binlog.FieldID, int64(msgLength), binlog)
|
||||
suite.Require().NoError(err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -109,7 +109,7 @@ func (suite *RetrieveSuite) SetupTest() {
|
|||
)
|
||||
suite.Require().NoError(err)
|
||||
for _, binlog := range binlogs {
|
||||
err = suite.sealed.(*LocalSegment).LoadFieldData(ctx, binlog.FieldID, int64(msgLength), binlog, false)
|
||||
err = suite.sealed.(*LocalSegment).LoadFieldData(ctx, binlog.FieldID, int64(msgLength), binlog)
|
||||
suite.Require().NoError(err)
|
||||
}
|
||||
|
||||
|
|
|
@ -100,7 +100,7 @@ func (suite *SearchSuite) SetupTest() {
|
|||
)
|
||||
suite.Require().NoError(err)
|
||||
for _, binlog := range binlogs {
|
||||
err = suite.sealed.(*LocalSegment).LoadFieldData(ctx, binlog.FieldID, int64(msgLength), binlog, false)
|
||||
err = suite.sealed.(*LocalSegment).LoadFieldData(ctx, binlog.FieldID, int64(msgLength), binlog)
|
||||
suite.Require().NoError(err)
|
||||
}
|
||||
|
||||
|
|
|
@ -873,7 +873,7 @@ func (s *LocalSegment) LoadMultiFieldData(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *LocalSegment) LoadFieldData(ctx context.Context, fieldID int64, rowCount int64, field *datapb.FieldBinlog, useMmap bool) error {
|
||||
func (s *LocalSegment) LoadFieldData(ctx context.Context, fieldID int64, rowCount int64, field *datapb.FieldBinlog) error {
|
||||
if !s.ptrLock.RLockIf(state.IsNotReleased) {
|
||||
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
|
||||
}
|
||||
|
@ -911,9 +911,13 @@ func (s *LocalSegment) LoadFieldData(ctx context.Context, fieldID int64, rowCoun
|
|||
}
|
||||
}
|
||||
|
||||
// TODO retrieve_enable should be considered
|
||||
collection := s.collection
|
||||
mmapEnabled := useMmap || common.IsFieldMmapEnabled(collection.Schema(), fieldID) ||
|
||||
(!common.FieldHasMmapKey(collection.Schema(), fieldID) && params.Params.QueryNodeCfg.MmapEnabled.GetAsBool())
|
||||
fieldSchema, err := getFieldSchema(collection.Schema(), fieldID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
mmapEnabled := isDataMmapEnable(fieldSchema)
|
||||
loadFieldDataInfo.appendMMapDirPath(paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue())
|
||||
loadFieldDataInfo.enableMmap(fieldID, mmapEnabled)
|
||||
|
||||
|
@ -1116,12 +1120,14 @@ func (s *LocalSegment) LoadIndex(ctx context.Context, indexInfo *querypb.FieldIn
|
|||
return err
|
||||
}
|
||||
|
||||
enableMmap := isIndexMmapEnable(fieldSchema, indexInfo)
|
||||
|
||||
indexInfoProto := &cgopb.LoadIndexInfo{
|
||||
CollectionID: s.Collection(),
|
||||
PartitionID: s.Partition(),
|
||||
SegmentID: s.ID(),
|
||||
Field: fieldSchema,
|
||||
EnableMmap: isIndexMmapEnable(indexInfo),
|
||||
EnableMmap: enableMmap,
|
||||
MmapDirPath: paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue(),
|
||||
IndexID: indexInfo.GetIndexID(),
|
||||
IndexBuildID: indexInfo.GetBuildID(),
|
||||
|
|
|
@ -44,7 +44,6 @@ import (
|
|||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/params"
|
||||
"github.com/milvus-io/milvus/internal/querynodev2/pkoracle"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
|
@ -662,7 +661,7 @@ func (loader *segmentLoader) loadSealedSegment(ctx context.Context, loadInfo *qu
|
|||
zap.String("index", info.IndexInfo.GetIndexName()),
|
||||
)
|
||||
// for scalar index's raw data, only load to mmap not memory
|
||||
if err = segment.LoadFieldData(ctx, fieldID, loadInfo.GetNumOfRows(), info.FieldBinlog, true); err != nil {
|
||||
if err = segment.LoadFieldData(ctx, fieldID, loadInfo.GetNumOfRows(), info.FieldBinlog); err != nil {
|
||||
log.Warn("load raw data failed", zap.Int64("fieldID", fieldID), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
@ -816,11 +815,7 @@ func loadSealedSegmentFields(ctx context.Context, collection *Collection, segmen
|
|||
fieldBinLog := field
|
||||
fieldID := field.FieldID
|
||||
runningGroup.Go(func() error {
|
||||
return segment.LoadFieldData(ctx,
|
||||
fieldID,
|
||||
rowCount,
|
||||
fieldBinLog,
|
||||
false)
|
||||
return segment.LoadFieldData(ctx, fieldID, rowCount, fieldBinLog)
|
||||
})
|
||||
}
|
||||
err := runningGroup.Wait()
|
||||
|
@ -1207,17 +1202,30 @@ func getResourceUsageEstimateOfSegment(schema *schemapb.CollectionSchema, loadIn
|
|||
var indexMemorySize uint64
|
||||
var mmapFieldCount int
|
||||
|
||||
vecFieldID2IndexInfo := make(map[int64]*querypb.FieldIndexInfo)
|
||||
fieldID2IndexInfo := make(map[int64]*querypb.FieldIndexInfo)
|
||||
for _, fieldIndexInfo := range loadInfo.IndexInfos {
|
||||
fieldID := fieldIndexInfo.FieldID
|
||||
vecFieldID2IndexInfo[fieldID] = fieldIndexInfo
|
||||
fieldID2IndexInfo[fieldID] = fieldIndexInfo
|
||||
}
|
||||
|
||||
schemaHelper, err := typeutil.CreateSchemaHelper(schema)
|
||||
if err != nil {
|
||||
log.Warn("failed to create schema helper", zap.String("name", schema.GetName()), zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
for _, fieldBinlog := range loadInfo.BinlogPaths {
|
||||
fieldID := fieldBinlog.FieldID
|
||||
var mmapEnabled bool
|
||||
if fieldIndexInfo, ok := vecFieldID2IndexInfo[fieldID]; ok {
|
||||
mmapEnabled = isIndexMmapEnable(fieldIndexInfo)
|
||||
// TODO retrieve_enable should be considered
|
||||
fieldSchema, err := schemaHelper.GetFieldFromID(fieldID)
|
||||
if err != nil {
|
||||
log.Warn("failed to get field schema", zap.Int64("fieldID", fieldID), zap.String("name", schema.GetName()), zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
binlogSize := uint64(getBinlogDataMemorySize(fieldBinlog))
|
||||
|
||||
if fieldIndexInfo, ok := fieldID2IndexInfo[fieldID]; ok {
|
||||
mmapEnabled = isIndexMmapEnable(fieldSchema, fieldIndexInfo)
|
||||
neededMemSize, neededDiskSize, err := getIndexAttrCache().GetIndexResourceUsage(fieldIndexInfo, multiplyFactor.memoryIndexUsageFactor, fieldBinlog)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to get index size collection %d, segment %d, indexBuildID %d",
|
||||
|
@ -1231,10 +1239,18 @@ func getResourceUsageEstimateOfSegment(schema *schemapb.CollectionSchema, loadIn
|
|||
} else {
|
||||
segmentDiskSize += neededDiskSize
|
||||
}
|
||||
if !hasRawData(fieldIndexInfo) {
|
||||
dataMmapEnable := isDataMmapEnable(fieldSchema)
|
||||
segmentMemorySize += binlogSize
|
||||
if dataMmapEnable {
|
||||
segmentDiskSize += uint64(getBinlogDataDiskSize(fieldBinlog))
|
||||
} else {
|
||||
segmentMemorySize += binlogSize
|
||||
}
|
||||
}
|
||||
} else {
|
||||
mmapEnabled = common.IsFieldMmapEnabled(schema, fieldID) ||
|
||||
(!common.FieldHasMmapKey(schema, fieldID) && params.Params.QueryNodeCfg.MmapEnabled.GetAsBool())
|
||||
binlogSize := uint64(getBinlogDataMemorySize(fieldBinlog))
|
||||
mmapEnabled = isDataMmapEnable(fieldSchema)
|
||||
|
||||
segmentMemorySize += binlogSize
|
||||
if mmapEnabled {
|
||||
segmentDiskSize += uint64(getBinlogDataDiskSize(fieldBinlog))
|
||||
|
|
|
@ -105,7 +105,7 @@ func (suite *SegmentSuite) SetupTest() {
|
|||
g, err := suite.sealed.(*LocalSegment).StartLoadData()
|
||||
suite.Require().NoError(err)
|
||||
for _, binlog := range binlogs {
|
||||
err = suite.sealed.(*LocalSegment).LoadFieldData(ctx, binlog.FieldID, int64(msgLength), binlog, false)
|
||||
err = suite.sealed.(*LocalSegment).LoadFieldData(ctx, binlog.FieldID, int64(msgLength), binlog)
|
||||
suite.Require().NoError(err)
|
||||
}
|
||||
g.Done(nil)
|
||||
|
|
|
@ -26,12 +26,14 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/params"
|
||||
"github.com/milvus-io/milvus/internal/querynodev2/segments/metricsutil"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
||||
"github.com/milvus-io/milvus/pkg/util/contextutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
|
@ -244,3 +246,46 @@ func getFieldSizeFromFieldBinlog(fieldBinlog *datapb.FieldBinlog) int64 {
|
|||
|
||||
return fieldSize
|
||||
}
|
||||
|
||||
func getFieldSchema(schema *schemapb.CollectionSchema, fieldID int64) (*schemapb.FieldSchema, error) {
|
||||
for _, field := range schema.Fields {
|
||||
if field.FieldID == fieldID {
|
||||
return field, nil
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("field %d not found in schema", fieldID)
|
||||
}
|
||||
|
||||
func isIndexMmapEnable(fieldSchema *schemapb.FieldSchema, indexInfo *querypb.FieldIndexInfo) bool {
|
||||
enableMmap, exist := common.IsMmapIndexEnabled(indexInfo.IndexParams...)
|
||||
if exist {
|
||||
return enableMmap
|
||||
}
|
||||
indexType := common.GetIndexType(indexInfo.IndexParams)
|
||||
var indexSupportMmap bool
|
||||
var defaultEnableMmap bool
|
||||
if typeutil.IsVectorType(fieldSchema.GetDataType()) {
|
||||
indexSupportMmap = indexparamcheck.IsVectorMmapIndex(indexType)
|
||||
defaultEnableMmap = params.Params.QueryNodeCfg.MmapVectorIndex.GetAsBool()
|
||||
} else {
|
||||
indexSupportMmap = indexparamcheck.IsScalarMmapIndex(indexType)
|
||||
defaultEnableMmap = params.Params.QueryNodeCfg.MmapScalarIndex.GetAsBool()
|
||||
}
|
||||
return indexSupportMmap && defaultEnableMmap
|
||||
}
|
||||
|
||||
func isDataMmapEnable(fieldSchema *schemapb.FieldSchema) bool {
|
||||
enableMmap, exist := common.IsMmapDataEnabled(fieldSchema.GetTypeParams()...)
|
||||
if exist {
|
||||
return enableMmap
|
||||
}
|
||||
if typeutil.IsVectorType(fieldSchema.GetDataType()) {
|
||||
return params.Params.QueryNodeCfg.MmapVectorField.GetAsBool()
|
||||
}
|
||||
return params.Params.QueryNodeCfg.MmapScalarField.GetAsBool()
|
||||
}
|
||||
|
||||
func hasRawData(indexInfo *querypb.FieldIndexInfo) bool {
|
||||
log.Warn("hasRawData is not implemented, please check it", zap.Int64("field_id", indexInfo.FieldID))
|
||||
return true
|
||||
}
|
||||
|
|
|
@ -5,8 +5,13 @@ import (
|
|||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
)
|
||||
|
||||
func TestFilterZeroValuesFromSlice(t *testing.T) {
|
||||
|
@ -75,3 +80,111 @@ func TestGetSegmentRelatedDataSize(t *testing.T) {
|
|||
assert.EqualValues(t, 100, GetSegmentRelatedDataSize(segment))
|
||||
})
|
||||
}
|
||||
|
||||
func TestGetFieldSchema(t *testing.T) {
|
||||
t.Run("no error", func(t *testing.T) {
|
||||
filedSchema, err := getFieldSchema(&schemapb.CollectionSchema{
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
FieldID: 1,
|
||||
},
|
||||
},
|
||||
}, 1)
|
||||
assert.NotNil(t, filedSchema)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("error", func(t *testing.T) {
|
||||
filedSchema, err := getFieldSchema(&schemapb.CollectionSchema{
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
FieldID: 2,
|
||||
},
|
||||
},
|
||||
}, 1)
|
||||
assert.Nil(t, filedSchema)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestIsIndexMmapEnable(t *testing.T) {
|
||||
paramtable.Init()
|
||||
|
||||
t.Run("mmap index param exist", func(t *testing.T) {
|
||||
enable := isIndexMmapEnable(&schemapb.FieldSchema{}, &querypb.FieldIndexInfo{
|
||||
IndexParams: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: common.MmapEnabledKey,
|
||||
Value: "false",
|
||||
},
|
||||
},
|
||||
})
|
||||
assert.False(t, enable)
|
||||
})
|
||||
|
||||
t.Run("mmap vector index param not exist", func(t *testing.T) {
|
||||
paramtable.Get().Save(paramtable.Get().QueryNodeCfg.MmapVectorIndex.Key, "true")
|
||||
defer paramtable.Get().Reset(paramtable.Get().QueryNodeCfg.MmapVectorIndex.Key)
|
||||
enable := isIndexMmapEnable(&schemapb.FieldSchema{
|
||||
DataType: schemapb.DataType_FloatVector,
|
||||
}, &querypb.FieldIndexInfo{
|
||||
IndexParams: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: common.IndexTypeKey,
|
||||
Value: indexparamcheck.IndexFaissIvfFlat,
|
||||
},
|
||||
},
|
||||
})
|
||||
assert.True(t, enable)
|
||||
})
|
||||
|
||||
t.Run("mmap scalar index param not exist", func(t *testing.T) {
|
||||
paramtable.Get().Save(paramtable.Get().QueryNodeCfg.MmapScalarIndex.Key, "true")
|
||||
defer paramtable.Get().Reset(paramtable.Get().QueryNodeCfg.MmapScalarIndex.Key)
|
||||
enable := isIndexMmapEnable(&schemapb.FieldSchema{
|
||||
DataType: schemapb.DataType_String,
|
||||
}, &querypb.FieldIndexInfo{
|
||||
IndexParams: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: common.IndexTypeKey,
|
||||
Value: indexparamcheck.IndexINVERTED,
|
||||
},
|
||||
},
|
||||
})
|
||||
assert.True(t, enable)
|
||||
})
|
||||
}
|
||||
|
||||
func TestIsDataMmmapEnable(t *testing.T) {
|
||||
paramtable.Init()
|
||||
|
||||
t.Run("mmap data param exist", func(t *testing.T) {
|
||||
enable := isDataMmapEnable(&schemapb.FieldSchema{
|
||||
TypeParams: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: common.MmapEnabledKey,
|
||||
Value: "true",
|
||||
},
|
||||
},
|
||||
})
|
||||
assert.True(t, enable)
|
||||
})
|
||||
|
||||
t.Run("mmap scalar data param not exist", func(t *testing.T) {
|
||||
paramtable.Get().Save(paramtable.Get().QueryNodeCfg.MmapScalarField.Key, "true")
|
||||
defer paramtable.Get().Reset(paramtable.Get().QueryNodeCfg.MmapScalarField.Key)
|
||||
enable := isDataMmapEnable(&schemapb.FieldSchema{
|
||||
DataType: schemapb.DataType_String,
|
||||
})
|
||||
assert.True(t, enable)
|
||||
})
|
||||
|
||||
t.Run("mmap vector data param not exist", func(t *testing.T) {
|
||||
paramtable.Get().Save(paramtable.Get().QueryNodeCfg.MmapVectorField.Key, "true")
|
||||
defer paramtable.Get().Reset(paramtable.Get().QueryNodeCfg.MmapVectorField.Key)
|
||||
enable := isDataMmapEnable(&schemapb.FieldSchema{
|
||||
DataType: schemapb.DataType_FloatVector,
|
||||
})
|
||||
assert.True(t, enable)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -376,6 +376,11 @@ func (node *QueryNode) Start() error {
|
|||
paramtable.SetUpdateTime(time.Now())
|
||||
mmapEnabled := paramtable.Get().QueryNodeCfg.MmapEnabled.GetAsBool()
|
||||
growingmmapEnable := paramtable.Get().QueryNodeCfg.GrowingMmapEnabled.GetAsBool()
|
||||
mmapVectorIndex := paramtable.Get().QueryNodeCfg.MmapVectorIndex.GetAsBool()
|
||||
mmapVectorField := paramtable.Get().QueryNodeCfg.MmapVectorField.GetAsBool()
|
||||
mmapScarlarIndex := paramtable.Get().QueryNodeCfg.MmapScalarIndex.GetAsBool()
|
||||
mmapScarlarField := paramtable.Get().QueryNodeCfg.MmapScalarField.GetAsBool()
|
||||
|
||||
node.UpdateStateCode(commonpb.StateCode_Healthy)
|
||||
|
||||
registry.GetInMemoryResolver().RegisterQueryNode(node.GetNodeID(), node)
|
||||
|
@ -384,6 +389,10 @@ func (node *QueryNode) Start() error {
|
|||
zap.String("Address", node.address),
|
||||
zap.Bool("mmapEnabled", mmapEnabled),
|
||||
zap.Bool("growingmmapEnable", growingmmapEnable),
|
||||
zap.Bool("mmapVectorIndex", mmapVectorIndex),
|
||||
zap.Bool("mmapVectorField", mmapVectorField),
|
||||
zap.Bool("mmapScarlarIndex", mmapScarlarIndex),
|
||||
zap.Bool("mmapScarlarField", mmapScarlarField),
|
||||
)
|
||||
})
|
||||
|
||||
|
|
|
@ -95,7 +95,6 @@ func (suite *ServiceSuite) SetupSuite() {
|
|||
// init param
|
||||
paramtable.Init()
|
||||
paramtable.Get().Save(paramtable.Get().QueryNodeCfg.GCEnabled.Key, "false")
|
||||
paramtable.Get().Save(paramtable.Get().QueryNodeCfg.CacheEnabled.Key, "false")
|
||||
|
||||
suite.rootPath = suite.T().Name()
|
||||
suite.collectionID = 111
|
||||
|
@ -1709,7 +1708,7 @@ func (suite *ServiceSuite) TestShowConfigurations_Normal() {
|
|||
MsgID: rand.Int63(),
|
||||
TargetID: suite.node.session.ServerID,
|
||||
},
|
||||
Pattern: "Cache.enabled",
|
||||
Pattern: "mmap.growingMmapEnabled",
|
||||
}
|
||||
|
||||
resp, err := suite.node.ShowConfigurations(ctx, req)
|
||||
|
@ -1725,7 +1724,7 @@ func (suite *ServiceSuite) TestShowConfigurations_Failed() {
|
|||
MsgID: rand.Int63(),
|
||||
TargetID: suite.node.session.ServerID,
|
||||
},
|
||||
Pattern: "Cache.enabled",
|
||||
Pattern: "mmap.growingMmapEnabled",
|
||||
}
|
||||
|
||||
// node not healthy
|
||||
|
|
|
@ -182,22 +182,34 @@ func IsSystemField(fieldID int64) bool {
|
|||
return fieldID < StartOfUserFieldID
|
||||
}
|
||||
|
||||
func IsMmapEnabled(kvs ...*commonpb.KeyValuePair) bool {
|
||||
func IsMmapDataEnabled(kvs ...*commonpb.KeyValuePair) (bool, bool) {
|
||||
for _, kv := range kvs {
|
||||
if kv.Key == MmapEnabledKey && strings.ToLower(kv.Value) == "true" {
|
||||
return true
|
||||
if kv.Key == MmapEnabledKey {
|
||||
enable, _ := strconv.ParseBool(kv.Value)
|
||||
return enable, true
|
||||
}
|
||||
}
|
||||
return false
|
||||
return false, false
|
||||
}
|
||||
|
||||
func IsFieldMmapEnabled(schema *schemapb.CollectionSchema, fieldID int64) bool {
|
||||
for _, field := range schema.GetFields() {
|
||||
if field.GetFieldID() == fieldID {
|
||||
return IsMmapEnabled(field.GetTypeParams()...)
|
||||
func IsMmapIndexEnabled(kvs ...*commonpb.KeyValuePair) (bool, bool) {
|
||||
for _, kv := range kvs {
|
||||
if kv.Key == MmapEnabledKey {
|
||||
enable, _ := strconv.ParseBool(kv.Value)
|
||||
return enable, true
|
||||
}
|
||||
}
|
||||
return false
|
||||
return false, false
|
||||
}
|
||||
|
||||
func GetIndexType(indexParams []*commonpb.KeyValuePair) string {
|
||||
for _, param := range indexParams {
|
||||
if param.Key == IndexTypeKey {
|
||||
return param.Value
|
||||
}
|
||||
}
|
||||
log.Warn("IndexType not found in indexParams")
|
||||
return ""
|
||||
}
|
||||
|
||||
func FieldHasMmapKey(schema *schemapb.CollectionSchema, fieldID int64) bool {
|
||||
|
|
|
@ -11,11 +11,19 @@
|
|||
|
||||
package indexparamcheck
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
)
|
||||
|
||||
// IndexType string.
|
||||
type IndexType = string
|
||||
|
||||
// IndexType definitions
|
||||
const (
|
||||
// vector index
|
||||
IndexGpuBF IndexType = "GPU_BRUTE_FORCE"
|
||||
IndexRaftIvfFlat IndexType = "GPU_IVF_FLAT"
|
||||
IndexRaftIvfPQ IndexType = "GPU_IVF_PQ"
|
||||
|
@ -32,13 +40,14 @@ const (
|
|||
IndexDISKANN IndexType = "DISKANN"
|
||||
IndexSparseInverted IndexType = "SPARSE_INVERTED_INDEX"
|
||||
IndexSparseWand IndexType = "SPARSE_WAND"
|
||||
IndexINVERTED IndexType = "INVERTED"
|
||||
|
||||
IndexSTLSORT IndexType = "STL_SORT"
|
||||
IndexTRIE IndexType = "TRIE"
|
||||
IndexTrie IndexType = "Trie"
|
||||
IndexBitmap IndexType = "BITMAP"
|
||||
IndexHybrid IndexType = "HYBRID"
|
||||
// scalar index
|
||||
IndexSTLSORT IndexType = "STL_SORT"
|
||||
IndexTRIE IndexType = "TRIE"
|
||||
IndexTrie IndexType = "Trie"
|
||||
IndexBitmap IndexType = "BITMAP"
|
||||
IndexHybrid IndexType = "HYBRID" // BITMAP + INVERTED
|
||||
IndexINVERTED IndexType = "INVERTED"
|
||||
|
||||
AutoIndex IndexType = "AUTOINDEX"
|
||||
)
|
||||
|
@ -50,7 +59,8 @@ func IsGpuIndex(indexType IndexType) bool {
|
|||
indexType == IndexRaftCagra
|
||||
}
|
||||
|
||||
func IsMmapSupported(indexType IndexType) bool {
|
||||
// IsVectorMmapIndex check if the vector index can be mmaped
|
||||
func IsVectorMmapIndex(indexType IndexType) bool {
|
||||
return indexType == IndexFaissIDMap ||
|
||||
indexType == IndexFaissIvfFlat ||
|
||||
indexType == IndexFaissIvfPQ ||
|
||||
|
@ -66,3 +76,23 @@ func IsMmapSupported(indexType IndexType) bool {
|
|||
func IsDiskIndex(indexType IndexType) bool {
|
||||
return indexType == IndexDISKANN
|
||||
}
|
||||
|
||||
func IsScalarMmapIndex(indexType IndexType) bool {
|
||||
return indexType == IndexINVERTED
|
||||
}
|
||||
|
||||
func ValidateMmapIndexParams(indexType IndexType, indexParams map[string]string) error {
|
||||
mmapEnable, ok := indexParams[common.MmapEnabledKey]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
enable, err := strconv.ParseBool(mmapEnable)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid %s value: %s, expected: true, false", common.MmapEnabledKey, mmapEnable)
|
||||
}
|
||||
mmapSupport := indexType == AutoIndex || IsVectorMmapIndex(indexType) || IsScalarMmapIndex(indexType)
|
||||
if enable && !mmapSupport {
|
||||
return fmt.Errorf("index type %s does not support mmap", indexType)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -0,0 +1,68 @@
|
|||
/*
|
||||
* Licensed to the LF AI & Data foundation under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package indexparamcheck
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
)
|
||||
|
||||
func TestIsScalarMmapIndex(t *testing.T) {
|
||||
t.Run("inverted index", func(t *testing.T) {
|
||||
assert.True(t, IsScalarMmapIndex(IndexINVERTED))
|
||||
})
|
||||
}
|
||||
|
||||
func TestIsVectorMmapIndex(t *testing.T) {
|
||||
t.Run("vector index", func(t *testing.T) {
|
||||
assert.True(t, IsVectorMmapIndex(IndexFaissIDMap))
|
||||
assert.False(t, IsVectorMmapIndex(IndexINVERTED))
|
||||
})
|
||||
}
|
||||
|
||||
func TestValidateMmapTypeParams(t *testing.T) {
|
||||
t.Run("inverted mmap enable", func(t *testing.T) {
|
||||
err := ValidateMmapIndexParams(IndexINVERTED, map[string]string{
|
||||
common.MmapEnabledKey: "true",
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("inverted mmap enable", func(t *testing.T) {
|
||||
err := ValidateMmapIndexParams(IndexINVERTED, map[string]string{})
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("invalid mmap enable value", func(t *testing.T) {
|
||||
err := ValidateMmapIndexParams(IndexINVERTED, map[string]string{
|
||||
common.MmapEnabledKey: "invalid",
|
||||
})
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("invalid mmap enable type", func(t *testing.T) {
|
||||
err := ValidateMmapIndexParams(IndexGpuBF, map[string]string{
|
||||
common.MmapEnabledKey: "true",
|
||||
})
|
||||
assert.Error(t, err)
|
||||
})
|
||||
}
|
|
@ -2339,10 +2339,14 @@ type queryNodeConfig struct {
|
|||
DiskCacheCapacityLimit ParamItem `refreshable:"true"`
|
||||
|
||||
// cache limit
|
||||
CacheEnabled ParamItem `refreshable:"false"`
|
||||
CacheMemoryLimit ParamItem `refreshable:"false"`
|
||||
MmapDirPath ParamItem `refreshable:"false"`
|
||||
CacheMemoryLimit ParamItem `refreshable:"false"`
|
||||
MmapDirPath ParamItem `refreshable:"false"`
|
||||
// Deprecated: Since 2.4.7, use `MmapVectorField`/`MmapVectorIndex`/`MmapScalarField`/`MmapScalarIndex` instead
|
||||
MmapEnabled ParamItem `refreshable:"false"`
|
||||
MmapVectorField ParamItem `refreshable:"false"`
|
||||
MmapVectorIndex ParamItem `refreshable:"false"`
|
||||
MmapScalarField ParamItem `refreshable:"false"`
|
||||
MmapScalarIndex ParamItem `refreshable:"false"`
|
||||
GrowingMmapEnabled ParamItem `refreshable:"false"`
|
||||
FixedFileSizeForMmapManager ParamItem `refreshable:"false"`
|
||||
MaxMmapDiskPercentageForMmapManager ParamItem `refreshable:"false"`
|
||||
|
@ -2578,14 +2582,6 @@ This defaults to true, indicating that Milvus creates temporary index for growin
|
|||
}
|
||||
p.CacheMemoryLimit.Init(base.mgr)
|
||||
|
||||
p.CacheEnabled = ParamItem{
|
||||
Key: "queryNode.cache.enabled",
|
||||
Version: "2.0.0",
|
||||
DefaultValue: "",
|
||||
Export: true,
|
||||
}
|
||||
p.CacheEnabled.Init(base.mgr)
|
||||
|
||||
p.MmapDirPath = ParamItem{
|
||||
Key: "queryNode.mmap.mmapDirPath",
|
||||
Version: "2.3.0",
|
||||
|
@ -2606,11 +2602,71 @@ This defaults to true, indicating that Milvus creates temporary index for growin
|
|||
Version: "2.4.0",
|
||||
DefaultValue: "false",
|
||||
FallbackKeys: []string{"queryNode.mmapEnabled"},
|
||||
Doc: "Enable mmap for loading data",
|
||||
Export: true,
|
||||
Doc: "Deprecated: Enable mmap for loading data, including vector/scalar data and index",
|
||||
Export: false,
|
||||
}
|
||||
p.MmapEnabled.Init(base.mgr)
|
||||
|
||||
p.MmapVectorField = ParamItem{
|
||||
Key: "queryNode.mmap.vectorField",
|
||||
Version: "2.4.7",
|
||||
DefaultValue: "false",
|
||||
Formatter: func(originValue string) string {
|
||||
if p.MmapEnabled.GetAsBool() {
|
||||
return "true"
|
||||
}
|
||||
return originValue
|
||||
},
|
||||
Doc: "Enable mmap for loading vector data",
|
||||
Export: true,
|
||||
}
|
||||
p.MmapVectorField.Init(base.mgr)
|
||||
|
||||
p.MmapVectorIndex = ParamItem{
|
||||
Key: "queryNode.mmap.vectorIndex",
|
||||
Version: "2.4.7",
|
||||
DefaultValue: "false",
|
||||
Formatter: func(originValue string) string {
|
||||
if p.MmapEnabled.GetAsBool() {
|
||||
return "true"
|
||||
}
|
||||
return originValue
|
||||
},
|
||||
Doc: "Enable mmap for loading vector index",
|
||||
Export: true,
|
||||
}
|
||||
p.MmapVectorIndex.Init(base.mgr)
|
||||
|
||||
p.MmapScalarField = ParamItem{
|
||||
Key: "queryNode.mmap.scalarField",
|
||||
Version: "2.4.7",
|
||||
DefaultValue: "false",
|
||||
Formatter: func(originValue string) string {
|
||||
if p.MmapEnabled.GetAsBool() {
|
||||
return "true"
|
||||
}
|
||||
return originValue
|
||||
},
|
||||
Doc: "Enable mmap for loading scalar data",
|
||||
Export: true,
|
||||
}
|
||||
p.MmapScalarField.Init(base.mgr)
|
||||
|
||||
p.MmapScalarIndex = ParamItem{
|
||||
Key: "queryNode.mmap.scalarIndex",
|
||||
Version: "2.4.7",
|
||||
DefaultValue: "false",
|
||||
Formatter: func(originValue string) string {
|
||||
if p.MmapEnabled.GetAsBool() {
|
||||
return "true"
|
||||
}
|
||||
return originValue
|
||||
},
|
||||
Doc: "Enable mmap for loading scalar index",
|
||||
Export: true,
|
||||
}
|
||||
p.MmapScalarIndex.Init(base.mgr)
|
||||
|
||||
p.GrowingMmapEnabled = ParamItem{
|
||||
Key: "queryNode.mmap.growingMmapEnabled",
|
||||
Version: "2.4.6",
|
||||
|
|
Loading…
Reference in New Issue