mirror of https://github.com/milvus-io/milvus.git
https://github.com/milvus-io/milvus/issues/31279 related pr: #31267 Signed-off-by: sunby <sunbingyi1992@gmail.com>pull/31372/head
parent
89cff29b6a
commit
e7b053817d
|
@ -354,6 +354,7 @@ queryNode:
|
|||
taskQueueExpire: 60 # 1 min by default, expire time of inner user task queue since queue is empty.
|
||||
enableCrossUserGrouping: false # false by default Enable Cross user grouping when using user-task-polling policy. (close it if task of any user can not merge others).
|
||||
maxPendingTaskPerUser: 1024 # 50 by default, max pending task in scheduler per user.
|
||||
mmapEnabled: false # enable mmap global, if set true, will use mmap to load segment data
|
||||
|
||||
# can specify ip for example
|
||||
# ip: 127.0.0.1
|
||||
|
|
|
@ -343,7 +343,7 @@ func (t *compactionTrigger) updateSegmentMaxSize(segments []*SegmentInfo) (bool,
|
|||
}
|
||||
vectorFields := typeutil.GetVectorFieldSchemas(collMeta.Schema)
|
||||
fieldIndexTypes := lo.SliceToMap(indexInfos, func(t *model.Index) (int64, indexparamcheck.IndexType) {
|
||||
return t.FieldID, getIndexType(t.IndexParams)
|
||||
return t.FieldID, GetIndexType(t.IndexParams)
|
||||
})
|
||||
vectorFieldsWithDiskIndex := lo.Filter(vectorFields, func(field *schemapb.FieldSchema, _ int) bool {
|
||||
if indexType, ok := fieldIndexTypes[field.FieldID]; ok {
|
||||
|
|
|
@ -255,7 +255,7 @@ func (ib *indexBuilder) process(buildID UniqueID) bool {
|
|||
return true
|
||||
}
|
||||
indexParams := ib.meta.indexMeta.GetIndexParams(meta.CollectionID, meta.IndexID)
|
||||
indexType := getIndexType(indexParams)
|
||||
indexType := GetIndexType(indexParams)
|
||||
if isFlatIndex(indexType) || meta.NumRows < Params.DataCoordCfg.MinSegmentNumRowsToEnableIndex.GetAsInt64() {
|
||||
log.Ctx(ib.ctx).Info("segment does not need index really", zap.Int64("buildID", buildID),
|
||||
zap.Int64("segmentID", meta.SegmentID), zap.Int64("num rows", meta.NumRows))
|
||||
|
@ -333,7 +333,7 @@ func (ib *indexBuilder) process(buildID UniqueID) bool {
|
|||
|
||||
fieldID := ib.meta.indexMeta.GetFieldIDByIndexID(meta.CollectionID, meta.IndexID)
|
||||
binlogIDs := getBinLogIds(segment, fieldID)
|
||||
if isDiskANNIndex(getIndexType(indexParams)) {
|
||||
if isDiskANNIndex(GetIndexType(indexParams)) {
|
||||
var err error
|
||||
indexParams, err = indexparams.UpdateDiskIndexBuildParams(Params, indexParams)
|
||||
if err != nil {
|
||||
|
|
|
@ -225,7 +225,7 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques
|
|||
metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc()
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
if getIndexType(req.GetIndexParams()) == indexparamcheck.IndexDISKANN && !s.indexNodeManager.ClientSupportDisk() {
|
||||
if GetIndexType(req.GetIndexParams()) == indexparamcheck.IndexDISKANN && !s.indexNodeManager.ClientSupportDisk() {
|
||||
errMsg := "all IndexNodes do not support disk indexes, please verify"
|
||||
log.Warn(errMsg)
|
||||
err = merr.WrapErrIndexNotSupported(indexparamcheck.IndexDISKANN)
|
||||
|
@ -270,7 +270,7 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques
|
|||
func ValidateIndexParams(index *model.Index, key, value string) error {
|
||||
switch key {
|
||||
case common.MmapEnabledKey:
|
||||
indexType := getIndexType(index.IndexParams)
|
||||
indexType := GetIndexType(index.IndexParams)
|
||||
if !indexparamcheck.IsMmapSupported(indexType) {
|
||||
return merr.WrapErrParameterInvalidMsg("index type %s does not support mmap", indexType)
|
||||
}
|
||||
|
|
|
@ -182,7 +182,7 @@ func getCollectionAutoCompactionEnabled(properties map[string]string) (bool, err
|
|||
return Params.DataCoordCfg.EnableAutoCompaction.GetAsBool(), nil
|
||||
}
|
||||
|
||||
func getIndexType(indexParams []*commonpb.KeyValuePair) string {
|
||||
func GetIndexType(indexParams []*commonpb.KeyValuePair) string {
|
||||
for _, param := range indexParams {
|
||||
if param.Key == common.IndexTypeKey {
|
||||
return param.Value
|
||||
|
|
|
@ -32,7 +32,9 @@ import (
|
|||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/datacoord"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/params"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
|
||||
|
@ -73,7 +75,14 @@ func (li *LoadIndexInfo) appendLoadIndexInfo(ctx context.Context, indexInfo *que
|
|||
indexPaths := indexInfo.IndexFilePaths
|
||||
|
||||
indexParams := funcutil.KeyValuePair2Map(indexInfo.IndexParams)
|
||||
enableMmap := indexParams[common.MmapEnabledKey] == "true"
|
||||
|
||||
enableMmap := common.IsMmapEnabled(indexInfo.IndexParams...)
|
||||
if !enableMmap {
|
||||
_, ok := indexParams[common.MmapEnabledKey]
|
||||
indexType := datacoord.GetIndexType(indexInfo.IndexParams)
|
||||
indexSupportMmap := indexparamcheck.IsMmapSupported(indexType)
|
||||
enableMmap = !ok && params.Params.QueryNodeCfg.MmapEnabled.GetAsBool() && indexSupportMmap
|
||||
}
|
||||
// as Knowhere reports error if encounter a unknown param, we need to delete it
|
||||
delete(indexParams, common.MmapEnabledKey)
|
||||
|
||||
|
|
|
@ -46,6 +46,7 @@ import (
|
|||
"github.com/milvus-io/milvus-storage/go/storage/options"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/params"
|
||||
"github.com/milvus-io/milvus/internal/querynodev2/pkoracle"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
typeutil_internal "github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
|
@ -1074,7 +1075,8 @@ func loadSealedSegmentFields(ctx context.Context, collection *Collection, segmen
|
|||
opts := opts
|
||||
fieldBinLog := field
|
||||
fieldID := field.FieldID
|
||||
mmapEnabled := common.IsFieldMmapEnabled(collection.Schema(), fieldID)
|
||||
mmapEnabled := common.IsFieldMmapEnabled(collection.Schema(), fieldID) ||
|
||||
(!common.FieldHasMmapKey(collection.Schema(), fieldID) && params.Params.QueryNodeCfg.MmapEnabled.GetAsBool())
|
||||
if mmapEnabled && options.LoadStatus == LoadStatusInMemory {
|
||||
opts = append(opts, WithLoadStatus(LoadStatusMapped))
|
||||
}
|
||||
|
|
|
@ -159,6 +159,20 @@ func IsFieldMmapEnabled(schema *schemapb.CollectionSchema, fieldID int64) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func FieldHasMmapKey(schema *schemapb.CollectionSchema, fieldID int64) bool {
|
||||
for _, field := range schema.GetFields() {
|
||||
if field.GetFieldID() == fieldID {
|
||||
for _, kv := range field.GetTypeParams() {
|
||||
if kv.Key == MmapEnabledKey {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func IsCollectionLazyLoadEnabled(kvs ...*commonpb.KeyValuePair) bool {
|
||||
for _, kv := range kvs {
|
||||
if kv.Key == LazyLoadEnableKey && strings.ToLower(kv.Value) == "true" {
|
||||
|
|
|
@ -1979,6 +1979,7 @@ type queryNodeConfig struct {
|
|||
CacheEnabled ParamItem `refreshable:"false"`
|
||||
CacheMemoryLimit ParamItem `refreshable:"false"`
|
||||
MmapDirPath ParamItem `refreshable:"false"`
|
||||
MmapEnabled ParamItem `refreshable:"false"`
|
||||
|
||||
// chunk cache
|
||||
ReadAheadPolicy ParamItem `refreshable:"false"`
|
||||
|
@ -2200,6 +2201,14 @@ func (p *queryNodeConfig) init(base *BaseTable) {
|
|||
}
|
||||
p.MmapDirPath.Init(base.mgr)
|
||||
|
||||
p.MmapEnabled = ParamItem{
|
||||
Key: "queryNode.mmapEnabled",
|
||||
Version: "2.4.0",
|
||||
DefaultValue: "false",
|
||||
Doc: "Enable mmap for loading data",
|
||||
}
|
||||
p.MmapEnabled.Init(base.mgr)
|
||||
|
||||
p.ReadAheadPolicy = ParamItem{
|
||||
Key: "queryNode.cache.readAheadPolicy",
|
||||
Version: "2.3.2",
|
||||
|
|
Loading…
Reference in New Issue