enhance: add resource usage estimate for segment interface (#31050)

issue: #30931

- move resource estimate function outside from segment loader.
- add load info and collection to base segment.
- add resource usage method for sealed segment.

Signed-off-by: chyezh <chyezh@outlook.com>
pull/31391/head
chyezh 2024-03-19 11:53:05 +08:00 committed by GitHub
parent 3e7e9f15cd
commit 8e293dc1ce
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 399 additions and 329 deletions

View File

@ -87,18 +87,22 @@ func (sd *shardDelegator) ProcessInsert(insertRecords map[int64]*InsertData) {
growing := sd.segmentManager.GetGrowing(segmentID)
if growing == nil {
var err error
// TODO: It's a wired implementation that growing segment have load info.
// we should separate the growing segment and sealed segment by type system.
growing, err = segments.NewSegment(
context.Background(),
sd.collection,
segmentID,
insertData.PartitionID,
sd.collectionID,
sd.vchannelName,
segments.SegmentTypeGrowing,
0,
insertData.StartPosition,
insertData.StartPosition,
datapb.SegmentLevel_L1,
&querypb.SegmentLoadInfo{
SegmentID: segmentID,
PartitionID: insertData.PartitionID,
CollectionID: sd.collectionID,
InsertChannel: sd.vchannelName,
StartPosition: insertData.StartPosition,
DeltaPosition: insertData.StartPosition,
Level: datapb.SegmentLevel_L1,
},
)
if err != nil {
log.Error("failed to create new segment",

View File

@ -32,10 +32,16 @@ import (
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
var indexAttrCache = NewIndexAttrCache()
// getIndexAttrCache use a singleton to store index meta cache.
func getIndexAttrCache() *IndexAttrCache {
return indexAttrCache
}
// IndexAttrCache index meta cache stores calculated attribute.
type IndexAttrCache struct {
loadWithDisk *typeutil.ConcurrentMap[typeutil.Pair[string, int32], bool]
@ -48,7 +54,7 @@ func NewIndexAttrCache() *IndexAttrCache {
}
}
func (c *IndexAttrCache) GetIndexResourceUsage(indexInfo *querypb.FieldIndexInfo) (memory uint64, disk uint64, err error) {
func (c *IndexAttrCache) GetIndexResourceUsage(indexInfo *querypb.FieldIndexInfo, memoryIndexLoadPredictMemoryUsageFactor float64) (memory uint64, disk uint64, err error) {
indexType, err := funcutil.GetAttrByKeyFromRepeatedKV(common.IndexTypeKey, indexInfo.IndexParams)
if err != nil {
return 0, 0, fmt.Errorf("index type not exist in index params")
@ -79,7 +85,7 @@ func (c *IndexAttrCache) GetIndexResourceUsage(indexInfo *querypb.FieldIndexInfo
factor := float64(1)
diskUsage := uint64(0)
if !isLoadWithDisk {
factor = paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat()
factor = memoryIndexLoadPredictMemoryUsageFactor
} else {
diskUsage = uint64(indexInfo.IndexSize)
}

View File

@ -51,7 +51,7 @@ func (s *IndexAttrCacheSuite) TestCacheMissing() {
CurrentIndexVersion: 0,
}
_, _, err := s.c.GetIndexResourceUsage(info)
_, _, err := s.c.GetIndexResourceUsage(info, paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat())
s.Require().NoError(err)
_, has := s.c.loadWithDisk.Get(typeutil.NewPair[string, int32]("test", 0))
@ -67,7 +67,7 @@ func (s *IndexAttrCacheSuite) TestDiskANN() {
IndexSize: 100,
}
memory, disk, err := s.c.GetIndexResourceUsage(info)
memory, disk, err := s.c.GetIndexResourceUsage(info, paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat())
s.Require().NoError(err)
_, has := s.c.loadWithDisk.Get(typeutil.NewPair[string, int32](indexparamcheck.IndexDISKANN, 0))
@ -88,7 +88,7 @@ func (s *IndexAttrCacheSuite) TestLoadWithDisk() {
s.Run("load_with_disk", func() {
s.c.loadWithDisk.Insert(typeutil.NewPair[string, int32]("test", 0), true)
memory, disk, err := s.c.GetIndexResourceUsage(info)
memory, disk, err := s.c.GetIndexResourceUsage(info, paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat())
s.Require().NoError(err)
s.EqualValues(100, memory)
@ -97,7 +97,7 @@ func (s *IndexAttrCacheSuite) TestLoadWithDisk() {
s.Run("load_with_disk", func() {
s.c.loadWithDisk.Insert(typeutil.NewPair[string, int32]("test", 0), false)
memory, disk, err := s.c.GetIndexResourceUsage(info)
memory, disk, err := s.c.GetIndexResourceUsage(info, paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat())
s.Require().NoError(err)
s.Equal(uint64(250), memory)
@ -109,7 +109,7 @@ func (s *IndexAttrCacheSuite) TestLoadWithDisk() {
IndexParams: []*commonpb.KeyValuePair{},
}
_, _, err := s.c.GetIndexResourceUsage(info)
_, _, err := s.c.GetIndexResourceUsage(info, paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat())
s.Error(err)
})
}

View File

@ -47,15 +47,15 @@ func (s *ManagerSuite) SetupTest() {
segment, err := NewSegment(
context.Background(),
NewCollection(s.collectionIDs[i], schema, GenTestIndexMeta(s.collectionIDs[i], schema), querypb.LoadType_LoadCollection),
id,
s.partitionIDs[i],
s.collectionIDs[i],
s.channels[i],
s.types[i],
0,
nil,
nil,
s.levels[i],
&querypb.SegmentLoadInfo{
SegmentID: id,
PartitionID: s.partitionIDs[i],
CollectionID: s.collectionIDs[i],
InsertChannel: s.channels[i],
Level: s.levels[i],
},
)
s.Require().NoError(err)
s.segments = append(s.segments, segment)

View File

@ -459,47 +459,6 @@ func (_c *MockSegment_InsertCount_Call) RunAndReturn(run func() int64) *MockSegm
return _c
}
// IsLazyLoad provides a mock function with given fields:
func (_m *MockSegment) IsLazyLoad() bool {
ret := _m.Called()
var r0 bool
if rf, ok := ret.Get(0).(func() bool); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// MockSegment_IsLazyLoad_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsLazyLoad'
type MockSegment_IsLazyLoad_Call struct {
*mock.Call
}
// IsLazyLoad is a helper method to define mock.On call
func (_e *MockSegment_Expecter) IsLazyLoad() *MockSegment_IsLazyLoad_Call {
return &MockSegment_IsLazyLoad_Call{Call: _e.mock.On("IsLazyLoad")}
}
func (_c *MockSegment_IsLazyLoad_Call) Run(run func()) *MockSegment_IsLazyLoad_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockSegment_IsLazyLoad_Call) Return(_a0 bool) *MockSegment_IsLazyLoad_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockSegment_IsLazyLoad_Call) RunAndReturn(run func() bool) *MockSegment_IsLazyLoad_Call {
_c.Call.Return(run)
return _c
}
// LastDeltaTimestamp provides a mock function with given fields:
func (_m *MockSegment) LastDeltaTimestamp() uint64 {
ret := _m.Called()
@ -952,6 +911,47 @@ func (_c *MockSegment_Release_Call) RunAndReturn(run func(...releaseOption)) *Mo
return _c
}
// ResourceUsageEstimate provides a mock function with given fields:
func (_m *MockSegment) ResourceUsageEstimate() ResourceUsage {
ret := _m.Called()
var r0 ResourceUsage
if rf, ok := ret.Get(0).(func() ResourceUsage); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(ResourceUsage)
}
return r0
}
// MockSegment_ResourceUsageEstimate_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ResourceUsageEstimate'
type MockSegment_ResourceUsageEstimate_Call struct {
*mock.Call
}
// ResourceUsageEstimate is a helper method to define mock.On call
func (_e *MockSegment_Expecter) ResourceUsageEstimate() *MockSegment_ResourceUsageEstimate_Call {
return &MockSegment_ResourceUsageEstimate_Call{Call: _e.mock.On("ResourceUsageEstimate")}
}
func (_c *MockSegment_ResourceUsageEstimate_Call) Run(run func()) *MockSegment_ResourceUsageEstimate_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockSegment_ResourceUsageEstimate_Call) Return(_a0 ResourceUsage) *MockSegment_ResourceUsageEstimate_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockSegment_ResourceUsageEstimate_Call) RunAndReturn(run func() ResourceUsage) *MockSegment_ResourceUsageEstimate_Call {
_c.Call.Return(run)
return _c
}
// Retrieve provides a mock function with given fields: ctx, plan
func (_m *MockSegment) Retrieve(ctx context.Context, plan *RetrievePlan) (*segcorepb.RetrieveResults, error) {
ret := _m.Called(ctx, plan)

View File

@ -26,6 +26,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/planpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
type PlanSuite struct {
@ -78,5 +79,6 @@ func (suite *PlanSuite) TestQueryPlanCollectionReleased() {
}
func TestPlan(t *testing.T) {
paramtable.Init()
suite.Run(t, new(PlanSuite))
}

View File

@ -75,15 +75,15 @@ func (suite *ReduceSuite) SetupTest() {
)
suite.segment, err = NewSegment(ctx,
suite.collection,
suite.segmentID,
suite.partitionID,
suite.collectionID,
"dml",
SegmentTypeSealed,
0,
nil,
nil,
datapb.SegmentLevel_Legacy,
&querypb.SegmentLoadInfo{
SegmentID: suite.segmentID,
CollectionID: suite.collectionID,
PartitionID: suite.partitionID,
InsertChannel: "dml",
Level: datapb.SegmentLevel_Legacy,
},
)
suite.Require().NoError(err)

View File

@ -85,15 +85,15 @@ func (suite *RetrieveSuite) SetupTest() {
suite.sealed, err = NewSegment(ctx,
suite.collection,
suite.segmentID,
suite.partitionID,
suite.collectionID,
"dml",
SegmentTypeSealed,
0,
nil,
nil,
datapb.SegmentLevel_Legacy,
&querypb.SegmentLoadInfo{
SegmentID: suite.segmentID,
CollectionID: suite.collectionID,
PartitionID: suite.partitionID,
InsertChannel: "dml",
Level: datapb.SegmentLevel_Legacy,
},
)
suite.Require().NoError(err)
@ -113,15 +113,15 @@ func (suite *RetrieveSuite) SetupTest() {
suite.growing, err = NewSegment(ctx,
suite.collection,
suite.segmentID+1,
suite.partitionID,
suite.collectionID,
"dml",
SegmentTypeGrowing,
0,
nil,
nil,
datapb.SegmentLevel_Legacy,
&querypb.SegmentLoadInfo{
SegmentID: suite.segmentID + 1,
CollectionID: suite.collectionID,
PartitionID: suite.partitionID,
InsertChannel: "dml",
Level: datapb.SegmentLevel_Legacy,
},
)
suite.Require().NoError(err)

View File

@ -76,15 +76,15 @@ func (suite *SearchSuite) SetupTest() {
suite.sealed, err = NewSegment(ctx,
suite.collection,
suite.segmentID,
suite.partitionID,
suite.collectionID,
"dml",
SegmentTypeSealed,
0,
nil,
nil,
datapb.SegmentLevel_Legacy,
&querypb.SegmentLoadInfo{
SegmentID: suite.segmentID,
CollectionID: suite.collectionID,
PartitionID: suite.partitionID,
InsertChannel: "dml",
Level: datapb.SegmentLevel_Legacy,
},
)
suite.Require().NoError(err)
@ -104,15 +104,15 @@ func (suite *SearchSuite) SetupTest() {
suite.growing, err = NewSegment(ctx,
suite.collection,
suite.segmentID+1,
suite.partitionID,
suite.collectionID,
"dml",
SegmentTypeGrowing,
0,
nil,
nil,
datapb.SegmentLevel_Legacy,
&querypb.SegmentLoadInfo{
SegmentID: suite.segmentID + 1,
CollectionID: suite.collectionID,
PartitionID: suite.partitionID,
InsertChannel: "dml",
Level: datapb.SegmentLevel_Legacy,
},
)
suite.Require().NoError(err)

View File

@ -77,67 +77,61 @@ type IndexedFieldInfo struct {
}
type baseSegment struct {
segmentID int64
partitionID int64
shard string
collectionID int64
typ SegmentType
level datapb.SegmentLevel
version *atomic.Int64
collection *Collection
version *atomic.Int64
// the load status of the segment,
// only transitions below are allowed:
// 1. LoadStatusMeta <-> LoadStatusMapped
// 2. LoadStatusMeta <-> LoadStatusInMemory
loadStatus *atomic.String
isLazyLoad bool
startPosition *msgpb.MsgPosition // for growing segment release
segmentType SegmentType
bloomFilterSet *pkoracle.BloomFilterSet
loadInfo *querypb.SegmentLoadInfo
resourceUsageCache *atomic.Pointer[ResourceUsage]
}
func newBaseSegment(id, partitionID, collectionID int64, shard string, typ SegmentType, level datapb.SegmentLevel, version int64, startPosition *msgpb.MsgPosition) baseSegment {
func newBaseSegment(collection *Collection, segmentType SegmentType, version int64, loadInfo *querypb.SegmentLoadInfo) baseSegment {
return baseSegment{
segmentID: id,
partitionID: partitionID,
collectionID: collectionID,
shard: shard,
typ: typ,
level: level,
collection: collection,
loadInfo: loadInfo,
version: atomic.NewInt64(version),
loadStatus: atomic.NewString(string(LoadStatusMeta)),
startPosition: startPosition,
bloomFilterSet: pkoracle.NewBloomFilterSet(id, partitionID, typ),
segmentType: segmentType,
bloomFilterSet: pkoracle.NewBloomFilterSet(loadInfo.GetSegmentID(), loadInfo.GetPartitionID(), segmentType),
resourceUsageCache: atomic.NewPointer[ResourceUsage](nil),
}
}
// ID returns the identity number.
func (s *baseSegment) ID() int64 {
return s.segmentID
return s.loadInfo.GetSegmentID()
}
func (s *baseSegment) Collection() int64 {
return s.collectionID
return s.loadInfo.GetCollectionID()
}
func (s *baseSegment) Partition() int64 {
return s.partitionID
return s.loadInfo.GetPartitionID()
}
func (s *baseSegment) Shard() string {
return s.shard
return s.loadInfo.GetInsertChannel()
}
func (s *baseSegment) Type() SegmentType {
return s.typ
return s.segmentType
}
func (s *baseSegment) Level() datapb.SegmentLevel {
return s.level
return s.loadInfo.GetLevel()
}
func (s *baseSegment) StartPosition() *msgpb.MsgPosition {
return s.startPosition
return s.loadInfo.GetStartPosition()
}
func (s *baseSegment) Version() int64 {
@ -152,11 +146,11 @@ func (s *baseSegment) LoadStatus() LoadStatus {
return LoadStatus(s.loadStatus.Load())
}
func (s *baseSegment) IsLazyLoad() bool {
return s.isLazyLoad
}
func (s *baseSegment) LoadInfo() *querypb.SegmentLoadInfo {
if s.segmentType == SegmentTypeGrowing {
// Growing segment do not have load info.
return nil
}
return s.loadInfo
}
@ -171,6 +165,32 @@ func (s *baseSegment) MayPkExist(pk storage.PrimaryKey) bool {
return s.bloomFilterSet.MayPkExist(pk)
}
// ResourceUsageEstimate returns the estimated resource usage of the segment.
func (s *baseSegment) ResourceUsageEstimate() ResourceUsage {
if s.segmentType == SegmentTypeGrowing {
// Growing segment cannot do resource usage estimate.
return ResourceUsage{}
}
cache := s.resourceUsageCache.Load()
if cache != nil {
return *cache
}
usage, err := getResourceUsageEstimateOfSegment(s.collection.Schema(), s.loadInfo, resourceEstimateFactor{
memoryUsageFactor: 1.0,
memoryIndexUsageFactor: 1.0,
enableTempSegmentIndex: false,
deltaDataExpansionFactor: paramtable.Get().QueryNodeCfg.DeltaDataExpansionRate.GetAsFloat(),
})
if err != nil {
// Should never failure, if failed, segment should never be loaded.
log.Warn("unreachable: failed to get resource usage estimate of segment", zap.Error(err), zap.Int64("collectionID", s.Collection()), zap.Int64("segmentID", s.ID()))
return ResourceUsage{}
}
s.resourceUsageCache.Store(usage)
return *usage
}
type FieldInfo struct {
datapb.FieldBinlog
RowCount int64
@ -197,23 +217,17 @@ type LocalSegment struct {
func NewSegment(ctx context.Context,
collection *Collection,
segmentID int64,
partitionID int64,
collectionID int64,
shard string,
segmentType SegmentType,
version int64,
startPosition *msgpb.MsgPosition,
deltaPosition *msgpb.MsgPosition,
level datapb.SegmentLevel,
loadInfo *querypb.SegmentLoadInfo,
) (Segment, error) {
log := log.Ctx(ctx)
/*
CStatus
NewSegment(CCollection collection, uint64_t segment_id, SegmentType seg_type, CSegmentInterface* newSegment);
*/
if level == datapb.SegmentLevel_L0 {
return NewL0Segment(collection, segmentID, partitionID, collectionID, shard, segmentType, version, startPosition, deltaPosition)
if loadInfo.GetLevel() == datapb.SegmentLevel_L0 {
return NewL0Segment(collection, segmentType, version, loadInfo)
}
var cSegType C.SegmentType
switch segmentType {
@ -222,16 +236,16 @@ func NewSegment(ctx context.Context,
case SegmentTypeGrowing:
cSegType = C.Growing
default:
return nil, fmt.Errorf("illegal segment type %d when create segment %d", segmentType, segmentID)
return nil, fmt.Errorf("illegal segment type %d when create segment %d", segmentType, loadInfo.GetSegmentID())
}
var newPtr C.CSegmentInterface
_, err := GetDynamicPool().Submit(func() (any, error) {
status := C.NewSegment(collection.collectionPtr, cSegType, C.int64_t(segmentID), &newPtr)
status := C.NewSegment(collection.collectionPtr, cSegType, C.int64_t(loadInfo.GetSegmentID()), &newPtr)
err := HandleCStatus(ctx, &status, "NewSegmentFailed",
zap.Int64("collectionID", collectionID),
zap.Int64("partitionID", partitionID),
zap.Int64("segmentID", segmentID),
zap.Int64("collectionID", loadInfo.GetCollectionID()),
zap.Int64("partitionID", loadInfo.GetPartitionID()),
zap.Int64("segmentID", loadInfo.GetSegmentID()),
zap.String("segmentType", segmentType.String()))
return nil, err
}).Await()
@ -240,15 +254,15 @@ func NewSegment(ctx context.Context,
}
log.Info("create segment",
zap.Int64("collectionID", collectionID),
zap.Int64("partitionID", partitionID),
zap.Int64("segmentID", segmentID),
zap.Int64("collectionID", loadInfo.GetCollectionID()),
zap.Int64("partitionID", loadInfo.GetPartitionID()),
zap.Int64("segmentID", loadInfo.GetSegmentID()),
zap.String("segmentType", segmentType.String()),
zap.String("level", level.String()),
zap.String("level", loadInfo.GetLevel().String()),
)
segment := &LocalSegment{
baseSegment: newBaseSegment(segmentID, partitionID, collectionID, shard, segmentType, level, version, startPosition),
baseSegment: newBaseSegment(collection, segmentType, version, loadInfo),
ptr: newPtr,
lastDeltaTimestamp: atomic.NewUint64(0),
fields: typeutil.NewConcurrentMap[int64, *FieldInfo](),
@ -269,33 +283,26 @@ func NewSegment(ctx context.Context,
func NewSegmentV2(
ctx context.Context,
collection *Collection,
segmentID int64,
partitionID int64,
collectionID int64,
shard string,
segmentType SegmentType,
version int64,
startPosition *msgpb.MsgPosition,
deltaPosition *msgpb.MsgPosition,
storageVersion int64,
level datapb.SegmentLevel,
loadInfo *querypb.SegmentLoadInfo,
) (Segment, error) {
/*
CSegmentInterface
NewSegment(CCollection collection, uint64_t segment_id, SegmentType seg_type);
*/
if level == datapb.SegmentLevel_L0 {
return NewL0Segment(collection, segmentID, partitionID, collectionID, shard, segmentType, version, startPosition, deltaPosition)
if loadInfo.GetLevel() == datapb.SegmentLevel_L0 {
return NewL0Segment(collection, segmentType, version, loadInfo)
}
var segmentPtr C.CSegmentInterface
var status C.CStatus
switch segmentType {
case SegmentTypeSealed:
status = C.NewSegment(collection.collectionPtr, C.Sealed, C.int64_t(segmentID), &segmentPtr)
status = C.NewSegment(collection.collectionPtr, C.Sealed, C.int64_t(loadInfo.GetSegmentID()), &segmentPtr)
case SegmentTypeGrowing:
status = C.NewSegment(collection.collectionPtr, C.Growing, C.int64_t(segmentID), &segmentPtr)
status = C.NewSegment(collection.collectionPtr, C.Growing, C.int64_t(loadInfo.GetSegmentID()), &segmentPtr)
default:
return nil, fmt.Errorf("illegal segment type %d when create segment %d", segmentType, segmentID)
return nil, fmt.Errorf("illegal segment type %d when create segment %d", segmentType, loadInfo.GetSegmentID())
}
if err := HandleCStatus(ctx, &status, "NewSegmentFailed"); err != nil {
@ -303,22 +310,22 @@ func NewSegmentV2(
}
log.Info("create segment",
zap.Int64("collectionID", collectionID),
zap.Int64("partitionID", partitionID),
zap.Int64("segmentID", segmentID),
zap.Int64("collectionID", loadInfo.GetCollectionID()),
zap.Int64("partitionID", loadInfo.GetPartitionID()),
zap.Int64("segmentID", loadInfo.GetSegmentID()),
zap.String("segmentType", segmentType.String()))
url, err := typeutil_internal.GetStorageURI(paramtable.Get().CommonCfg.StorageScheme.GetValue(), paramtable.Get().CommonCfg.StoragePathPrefix.GetValue(), segmentID)
url, err := typeutil_internal.GetStorageURI(paramtable.Get().CommonCfg.StorageScheme.GetValue(), paramtable.Get().CommonCfg.StoragePathPrefix.GetValue(), loadInfo.GetSegmentID())
if err != nil {
return nil, err
}
space, err := milvus_storage.Open(url, options.NewSpaceOptionBuilder().SetVersion(storageVersion).Build())
space, err := milvus_storage.Open(url, options.NewSpaceOptionBuilder().SetVersion(loadInfo.GetStorageVersion()).Build())
if err != nil {
return nil, err
}
segment := &LocalSegment{
baseSegment: newBaseSegment(segmentID, partitionID, collectionID, shard, segmentType, level, version, startPosition),
baseSegment: newBaseSegment(collection, segmentType, version, loadInfo),
ptr: segmentPtr,
lastDeltaTimestamp: atomic.NewUint64(0),
fields: typeutil.NewConcurrentMap[int64, *FieldInfo](),
@ -461,13 +468,13 @@ func (s *LocalSegment) Search(ctx context.Context, searchReq *SearchRequest) (*S
log := log.Ctx(ctx).With(
zap.Int64("collectionID", s.Collection()),
zap.Int64("segmentID", s.ID()),
zap.String("segmentType", s.typ.String()),
zap.String("segmentType", s.segmentType.String()),
)
s.ptrLock.RLock()
defer s.ptrLock.RUnlock()
if s.ptr == nil {
return nil, merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released")
return nil, merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
traceCtx := ParseCTraceContext(ctx)
@ -493,7 +500,7 @@ func (s *LocalSegment) Search(ctx context.Context, searchReq *SearchRequest) (*S
if err := HandleCStatus(ctx, &status, "Search failed",
zap.Int64("collectionID", s.Collection()),
zap.Int64("segmentID", s.ID()),
zap.String("segmentType", s.typ.String())); err != nil {
zap.String("segmentType", s.segmentType.String())); err != nil {
return nil, err
}
log.Debug("search segment done")
@ -505,7 +512,7 @@ func (s *LocalSegment) Retrieve(ctx context.Context, plan *RetrievePlan) (*segco
defer s.ptrLock.RUnlock()
if s.ptr == nil {
return nil, merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released")
return nil, merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
log := log.Ctx(ctx).With(
@ -513,7 +520,7 @@ func (s *LocalSegment) Retrieve(ctx context.Context, plan *RetrievePlan) (*segco
zap.Int64("partitionID", s.Partition()),
zap.Int64("segmentID", s.ID()),
zap.Int64("msgID", plan.msgID),
zap.String("segmentType", s.typ.String()),
zap.String("segmentType", s.segmentType.String()),
)
traceCtx := ParseCTraceContext(ctx)
@ -542,7 +549,7 @@ func (s *LocalSegment) Retrieve(ctx context.Context, plan *RetrievePlan) (*segco
zap.Int64("partitionID", s.Partition()),
zap.Int64("segmentID", s.ID()),
zap.Int64("msgID", plan.msgID),
zap.String("segmentType", s.typ.String())); err != nil {
zap.String("segmentType", s.segmentType.String())); err != nil {
return nil, err
}
@ -595,14 +602,14 @@ func (s *LocalSegment) preInsert(ctx context.Context, numOfRecords int) (int64,
func (s *LocalSegment) Insert(ctx context.Context, rowIDs []int64, timestamps []typeutil.Timestamp, record *segcorepb.InsertRecord) error {
if s.Type() != SegmentTypeGrowing {
return fmt.Errorf("unexpected segmentType when segmentInsert, segmentType = %s", s.typ.String())
return fmt.Errorf("unexpected segmentType when segmentInsert, segmentType = %s", s.segmentType.String())
}
s.ptrLock.RLock()
defer s.ptrLock.RUnlock()
if s.ptr == nil {
return merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released")
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
offset, err := s.preInsert(ctx, len(rowIDs))
@ -662,7 +669,7 @@ func (s *LocalSegment) Delete(ctx context.Context, primaryKeys []storage.Primary
defer s.ptrLock.RUnlock()
if s.ptr == nil {
return merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released")
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
cOffset := C.int64_t(0) // depre
@ -728,7 +735,7 @@ func (s *LocalSegment) LoadMultiFieldData(ctx context.Context, rowCount int64, f
defer s.ptrLock.RUnlock()
if s.ptr == nil {
return merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released")
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
log := log.Ctx(ctx).With(
@ -763,7 +770,7 @@ func (s *LocalSegment) LoadMultiFieldData(ctx context.Context, rowCount int64, f
var status C.CStatus
GetLoadPool().Submit(func() (any, error) {
if paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool() {
uri, err := typeutil_internal.GetStorageURI(paramtable.Get().CommonCfg.StorageScheme.GetValue(), paramtable.Get().CommonCfg.StoragePathPrefix.GetValue(), s.segmentID)
uri, err := typeutil_internal.GetStorageURI(paramtable.Get().CommonCfg.StorageScheme.GetValue(), paramtable.Get().CommonCfg.StoragePathPrefix.GetValue(), s.ID())
if err != nil {
return nil, err
}
@ -831,11 +838,11 @@ func (s *LocalSegment) LoadFieldData(ctx context.Context, fieldID int64, rowCoun
s.ptrLock.RLock()
defer s.ptrLock.RUnlock()
ctx, sp := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, fmt.Sprintf("LoadFieldData-%d-%d", s.segmentID, fieldID))
ctx, sp := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, fmt.Sprintf("LoadFieldData-%d-%d", s.ID(), fieldID))
defer sp.End()
if s.ptr == nil {
return merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released")
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
log := log.Ctx(ctx).With(
@ -875,7 +882,7 @@ func (s *LocalSegment) LoadFieldData(ctx context.Context, fieldID int64, rowCoun
GetLoadPool().Submit(func() (any, error) {
log.Info("submitted loadFieldData task to load pool")
if paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool() {
uri, err := typeutil_internal.GetStorageURI(paramtable.Get().CommonCfg.StorageScheme.GetValue(), paramtable.Get().CommonCfg.StoragePathPrefix.GetValue(), s.segmentID)
uri, err := typeutil_internal.GetStorageURI(paramtable.Get().CommonCfg.StorageScheme.GetValue(), paramtable.Get().CommonCfg.StoragePathPrefix.GetValue(), s.ID())
if err != nil {
return nil, err
}
@ -996,7 +1003,7 @@ func (s *LocalSegment) AddFieldDataInfo(ctx context.Context, rowCount int64, fie
defer s.ptrLock.RUnlock()
if s.ptr == nil {
return merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released")
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
log := log.Ctx(ctx).With(
@ -1051,7 +1058,7 @@ func (s *LocalSegment) LoadDeltaData(ctx context.Context, deltaData *storage.Del
defer s.ptrLock.RUnlock()
if s.ptr == nil {
return merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released")
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
log := log.Ctx(ctx).With(
@ -1137,7 +1144,7 @@ func (s *LocalSegment) LoadIndex(ctx context.Context, indexInfo *querypb.FieldIn
return nil
}
ctx, sp := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, fmt.Sprintf("LoadIndex-%d-%d", s.segmentID, indexInfo.GetFieldID()))
ctx, sp := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, fmt.Sprintf("LoadIndex-%d-%d", s.ID(), indexInfo.GetFieldID()))
defer sp.End()
log := log.Ctx(ctx).With(
@ -1155,14 +1162,14 @@ func (s *LocalSegment) LoadIndex(ctx context.Context, indexInfo *querypb.FieldIn
defer deleteLoadIndexInfo(loadIndexInfo)
if paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool() {
uri, err := typeutil_internal.GetStorageURI(paramtable.Get().CommonCfg.StorageScheme.GetValue(), paramtable.Get().CommonCfg.StoragePathPrefix.GetValue(), s.segmentID)
uri, err := typeutil_internal.GetStorageURI(paramtable.Get().CommonCfg.StorageScheme.GetValue(), paramtable.Get().CommonCfg.StoragePathPrefix.GetValue(), s.ID())
if err != nil {
return err
}
loadIndexInfo.appendStorageInfo(uri, indexInfo.IndexStoreVersion)
}
err = loadIndexInfo.appendLoadIndexInfo(ctx, indexInfo, s.collectionID, s.partitionID, s.segmentID, fieldType)
err = loadIndexInfo.appendLoadIndexInfo(ctx, indexInfo, s.Collection(), s.Partition(), s.ID(), fieldType)
if err != nil {
if loadIndexInfo.cleanLocalData(ctx) != nil {
log.Warn("failed to clean cached data on disk after append index failed",
@ -1172,7 +1179,7 @@ func (s *LocalSegment) LoadIndex(ctx context.Context, indexInfo *querypb.FieldIn
return err
}
if s.Type() != SegmentTypeSealed {
errMsg := fmt.Sprintln("updateSegmentIndex failed, illegal segment type ", s.typ, "segmentID = ", s.ID())
errMsg := fmt.Sprintln("updateSegmentIndex failed, illegal segment type ", s.segmentType, "segmentID = ", s.ID())
return errors.New(errMsg)
}
@ -1199,7 +1206,7 @@ func (s *LocalSegment) UpdateIndexInfo(ctx context.Context, indexInfo *querypb.F
defer s.ptrLock.RUnlock()
if s.ptr == nil {
return merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released")
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
}
var status C.CStatus
@ -1368,10 +1375,10 @@ func (s *LocalSegment) Release(opts ...releaseOption) {
}
log.Info("delete segment from memory",
zap.Int64("collectionID", s.collectionID),
zap.Int64("partitionID", s.partitionID),
zap.Int64("collectionID", s.Collection()),
zap.Int64("partitionID", s.Partition()),
zap.Int64("segmentID", s.ID()),
zap.String("segmentType", s.typ.String()),
zap.String("segmentType", s.segmentType.String()),
zap.Int64("insertCount", s.InsertCount()),
)
}

View File

@ -35,7 +35,19 @@ const (
LoadStatusInMemory LoadStatus = "in_memory"
)
// ResourceUsage is used to estimate the resource usage of a sealed segment.
type ResourceUsage struct {
MemorySize uint64
DiskSize uint64
MmapFieldCount int
}
// Segment is the interface of a segment implementation.
// Some methods can not apply to all segment typessuch as LoadInfo, ResourceUsageEstimate.
// Add more interface to represent different segment types is a better implementation.
type Segment interface {
// ResourceUsageEstimate() ResourceUsage
// Properties
ID() int64
Collection() int64
@ -47,7 +59,6 @@ type Segment interface {
Type() SegmentType
Level() datapb.SegmentLevel
LoadStatus() LoadStatus
IsLazyLoad() bool
LoadInfo() *querypb.SegmentLoadInfo
RLock() error
RUnlock()
@ -58,6 +69,8 @@ type Segment interface {
// RowNum returns the number of rows, it's slow, so DO NOT call it in a loop
RowNum() int64
MemSize() int64
// ResourceUsageEstimate returns the estimated resource usage of the segment
ResourceUsageEstimate() ResourceUsage
// Index related
GetIndex(fieldID int64) *IndexedFieldInfo

View File

@ -21,8 +21,8 @@ import (
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/segcorepb"
storage "github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/log"
@ -41,14 +41,9 @@ type L0Segment struct {
}
func NewL0Segment(collection *Collection,
segmentID int64,
partitionID int64,
collectionID int64,
shard string,
segmentType SegmentType,
version int64,
startPosition *msgpb.MsgPosition,
deltaPosition *msgpb.MsgPosition,
loadInfo *querypb.SegmentLoadInfo,
) (Segment, error) {
/*
CSegmentInterface
@ -56,13 +51,13 @@ func NewL0Segment(collection *Collection,
*/
log.Info("create L0 segment",
zap.Int64("collectionID", collectionID),
zap.Int64("partitionID", partitionID),
zap.Int64("segmentID", segmentID),
zap.Int64("collectionID", loadInfo.GetCollectionID()),
zap.Int64("partitionID", loadInfo.GetPartitionID()),
zap.Int64("segmentID", loadInfo.GetSegmentID()),
zap.String("segmentType", segmentType.String()))
segment := &L0Segment{
baseSegment: newBaseSegment(segmentID, partitionID, collectionID, shard, segmentType, datapb.SegmentLevel_L0, version, startPosition),
baseSegment: newBaseSegment(collection, segmentType, version, loadInfo),
}
// level 0 segments are always in memory
@ -119,7 +114,7 @@ func (s *L0Segment) Indexes() []*IndexedFieldInfo {
}
func (s *L0Segment) Type() SegmentType {
return s.typ
return s.segmentType
}
func (s *L0Segment) Level() datapb.SegmentLevel {

View File

@ -95,6 +95,14 @@ func (r *LoadResource) Sub(resource LoadResource) {
r.DiskSize -= resource.DiskSize
}
type resourceEstimateFactor struct {
memoryUsageFactor float64
memoryIndexUsageFactor float64
enableTempSegmentIndex bool
tempSegmentIndexFactor float64
deltaDataExpansionFactor float64
}
type segmentLoaderV2 struct {
*segmentLoader
}
@ -164,29 +172,26 @@ func (loader *segmentLoaderV2) Load(ctx context.Context,
}()
for _, info := range infos {
segmentID := info.GetSegmentID()
partitionID := info.GetPartitionID()
collectionID := info.GetCollectionID()
shard := info.GetInsertChannel()
loadInfo := info
collection := loader.manager.Collection.Get(collectionID)
collection := loader.manager.Collection.Get(loadInfo.GetCollectionID())
if collection == nil {
err := merr.WrapErrCollectionNotFound(collectionID)
err := merr.WrapErrCollectionNotFound(loadInfo.GetCollectionID())
log.Warn("failed to get collection", zap.Error(err))
return nil, err
}
segment, err := NewSegmentV2(ctx, collection, segmentID, partitionID, collectionID, shard, segmentType, version, info.GetStartPosition(), info.GetDeltaPosition(), info.GetStorageVersion(), info.GetLevel())
segment, err := NewSegmentV2(ctx, collection, segmentType, version, loadInfo)
if err != nil {
log.Warn("load segment failed when create new segment",
zap.Int64("partitionID", partitionID),
zap.Int64("segmentID", segmentID),
zap.Int64("partitionID", loadInfo.GetPartitionID()),
zap.Int64("segmentID", loadInfo.GetSegmentID()),
zap.Error(err),
)
return nil, err
}
newSegments.Insert(segmentID, segment)
newSegments.Insert(loadInfo.GetSegmentID(), segment)
}
loadSegmentFunc := func(idx int) error {
@ -439,10 +444,10 @@ func (loader *segmentLoaderV2) loadSegment(ctx context.Context,
}
// load statslog if it's growing segment
if segment.typ == SegmentTypeGrowing {
if segment.segmentType == SegmentTypeGrowing {
log.Info("loading statslog...")
// pkStatsBinlogs, logType := loader.filterPKStatsBinlogs(loadInfo.Statslogs, pkField.GetFieldID())
err := loader.loadBloomFilter(ctx, segment.segmentID, segment.bloomFilterSet, loadInfo.StorageVersion)
err := loader.loadBloomFilter(ctx, segment.ID(), segment.bloomFilterSet, loadInfo.StorageVersion)
if err != nil {
return err
}
@ -467,8 +472,8 @@ func (loader *segmentLoaderV2) loadSealedSegmentFields(ctx context.Context, segm
}
log.Ctx(ctx).Info("load field binlogs done for sealed segment",
zap.Int64("collection", segment.collectionID),
zap.Int64("segment", segment.segmentID),
zap.Int64("collection", segment.Collection()),
zap.Int64("segment", segment.ID()),
zap.String("segmentType", segment.Type().String()))
return nil
@ -496,7 +501,6 @@ func NewLoader(
log.Info("SegmentLoader created", zap.Int("ioPoolSize", ioPoolSize))
loader := &segmentLoader{
IndexAttrCache: NewIndexAttrCache(),
manager: manager,
cm: cm,
loadingSegments: typeutil.NewConcurrentMap[int64, *loadResult](),
@ -532,7 +536,6 @@ func (r *loadResult) SetResult(status loadStatus) {
// segmentLoader is only responsible for loading the field data from binlog
type segmentLoader struct {
*IndexAttrCache
manager *Manager
cm storage.ChunkManager
@ -594,10 +597,7 @@ func (loader *segmentLoader) Load(ctx context.Context,
}()
for _, info := range infos {
segmentID := info.GetSegmentID()
partitionID := info.GetPartitionID()
collectionID := info.GetCollectionID()
shard := info.GetInsertChannel()
loadInfo := info
collection := loader.manager.Collection.Get(collectionID)
if collection == nil {
@ -609,26 +609,20 @@ func (loader *segmentLoader) Load(ctx context.Context,
segment, err := NewSegment(
ctx,
collection,
segmentID,
partitionID,
collectionID,
shard,
segmentType,
version,
info.GetStartPosition(),
info.GetDeltaPosition(),
info.GetLevel(),
loadInfo,
)
if err != nil {
log.Warn("load segment failed when create new segment",
zap.Int64("partitionID", partitionID),
zap.Int64("segmentID", segmentID),
zap.Int64("partitionID", loadInfo.GetPartitionID()),
zap.Int64("segmentID", loadInfo.GetSegmentID()),
zap.Error(err),
)
return nil, err
}
newSegments.Insert(segmentID, segment)
newSegments.Insert(loadInfo.GetSegmentID(), segment)
}
loadSegmentFunc := func(idx int) error {
@ -1024,10 +1018,10 @@ func (loader *segmentLoader) loadSegment(ctx context.Context,
}
// load statslog if it's growing segment
if segment.typ == SegmentTypeGrowing {
if segment.segmentType == SegmentTypeGrowing {
log.Info("loading statslog...")
pkStatsBinlogs, logType := loader.filterPKStatsBinlogs(loadInfo.Statslogs, pkField.GetFieldID())
err := loader.loadBloomFilter(ctx, segment.segmentID, segment.bloomFilterSet, pkStatsBinlogs, logType)
err := loader.loadBloomFilter(ctx, segment.ID(), segment.bloomFilterSet, pkStatsBinlogs, logType)
if err != nil {
return err
}
@ -1095,8 +1089,8 @@ func loadSealedSegmentFields(ctx context.Context, collection *Collection, segmen
}
log.Ctx(ctx).Info("load field binlogs done for sealed segment",
zap.Int64("collection", segment.collectionID),
zap.Int64("segment", segment.segmentID),
zap.Int64("collection", segment.Collection()),
zap.Int64("segment", segment.ID()),
zap.Int("len(field)", len(fields)),
zap.String("segmentType", segment.Type().String()))
@ -1290,7 +1284,7 @@ func (loader *segmentLoader) patchEntryNumber(ctx context.Context, segment *Loca
return nil
}
log.Warn("legacy segment binlog found, start to patch entry num", zap.Int64("segmentID", segment.segmentID))
log.Warn("legacy segment binlog found, start to patch entry num", zap.Int64("segmentID", segment.ID()))
rowIDField := lo.FindOrElse(loadInfo.BinlogPaths, nil, func(binlog *datapb.FieldBinlog) bool {
return binlog.GetFieldID() == common.RowIDField
})
@ -1379,88 +1373,41 @@ func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadIn
metrics.QueryNodeDiskUsedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(toMB(uint64(localDiskUsage)))
diskUsage := uint64(localDiskUsage) + loader.committedResource.DiskSize
memoryUsageFactor := paramtable.Get().QueryNodeCfg.LoadMemoryUsageFactor.GetAsFloat()
factor := resourceEstimateFactor{
memoryUsageFactor: paramtable.Get().QueryNodeCfg.LoadMemoryUsageFactor.GetAsFloat(),
memoryIndexUsageFactor: paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat(),
enableTempSegmentIndex: paramtable.Get().QueryNodeCfg.EnableTempSegmentIndex.GetAsBool(),
tempSegmentIndexFactor: paramtable.Get().QueryNodeCfg.InterimIndexMemExpandRate.GetAsFloat(),
deltaDataExpansionFactor: paramtable.Get().QueryNodeCfg.DeltaDataExpansionRate.GetAsFloat(),
}
maxSegmentSize := uint64(0)
predictMemUsage := memUsage
predictDiskUsage := diskUsage
mmapFieldCount := 0
for _, loadInfo := range segmentLoadInfos {
var segmentMemorySize, segmentDiskSize uint64
collection := loader.manager.Collection.Get(loadInfo.GetCollectionID())
vecFieldID2IndexInfo := make(map[int64]*querypb.FieldIndexInfo)
for _, fieldIndexInfo := range loadInfo.IndexInfos {
if fieldIndexInfo.EnableIndex {
fieldID := fieldIndexInfo.FieldID
vecFieldID2IndexInfo[fieldID] = fieldIndexInfo
}
usage, err := getResourceUsageEstimateOfSegment(collection.Schema(), loadInfo, factor)
if err != nil {
log.Warn(
"failed to estimate resource usage of segment",
zap.Int64("collectionID", loadInfo.GetCollectionID()),
zap.Int64("segmentID", loadInfo.GetSegmentID()),
zap.Error(err))
return 0, 0, err
}
for _, fieldBinlog := range loadInfo.BinlogPaths {
fieldID := fieldBinlog.FieldID
mmapEnabled := common.IsFieldMmapEnabled(collection.Schema(), fieldID)
if fieldIndexInfo, ok := vecFieldID2IndexInfo[fieldID]; ok {
neededMemSize, neededDiskSize, err := loader.GetIndexResourceUsage(fieldIndexInfo)
if err != nil {
log.Warn("failed to get index size",
zap.Int64("collectionID", loadInfo.CollectionID),
zap.Int64("segmentID", loadInfo.SegmentID),
zap.Int64("indexBuildID", fieldIndexInfo.BuildID),
zap.Error(err),
)
return 0, 0, err
}
if mmapEnabled {
segmentDiskSize += neededMemSize + neededDiskSize
} else {
segmentMemorySize += neededMemSize
segmentDiskSize += neededDiskSize
}
} else {
binlogSize := uint64(getBinlogDataSize(fieldBinlog))
if mmapEnabled {
segmentDiskSize += binlogSize
} else {
segmentMemorySize += binlogSize
enableBinlogIndex := paramtable.Get().QueryNodeCfg.EnableTempSegmentIndex.GetAsBool()
if enableBinlogIndex {
buildBinlogIndexRate := paramtable.Get().QueryNodeCfg.InterimIndexMemExpandRate.GetAsFloat()
segmentMemorySize += uint64(float64(binlogSize) * buildBinlogIndexRate)
}
}
}
if mmapEnabled {
mmapFieldCount++
}
}
// get size of stats data
for _, fieldBinlog := range loadInfo.Statslogs {
segmentMemorySize += uint64(getBinlogDataSize(fieldBinlog))
}
// binlog & statslog use general load factor
segmentMemorySize = uint64(float64(segmentMemorySize) * memoryUsageFactor)
// get size of delete data
for _, fieldBinlog := range loadInfo.Deltalogs {
segmentMemorySize += uint64(float64(getBinlogDataSize(fieldBinlog)) * paramtable.Get().QueryNodeCfg.DeltaDataExpansionRate.GetAsFloat())
}
if segmentMemorySize > maxSegmentSize {
maxSegmentSize = segmentMemorySize
}
predictMemUsage += segmentMemorySize
predictDiskUsage += segmentDiskSize
log.Debug("segment resource for loading",
zap.Int64("segmentID", loadInfo.GetSegmentID()),
zap.Float64("memoryUsage(MB)", toMB(segmentMemorySize)),
zap.Float64("diskUsage(MB)", toMB(segmentDiskSize)),
zap.Float64("memoryLoadFactor", memoryUsageFactor),
zap.Float64("memoryUsage(MB)", toMB(usage.MemorySize)),
zap.Float64("diskUsage(MB)", toMB(usage.DiskSize)),
zap.Float64("memoryLoadFactor", factor.memoryUsageFactor),
)
mmapFieldCount += usage.MmapFieldCount
predictDiskUsage += usage.DiskSize
predictMemUsage += usage.MemorySize
if usage.MemorySize > maxSegmentSize {
maxSegmentSize = usage.MemorySize
}
}
log.Info("predict memory and disk usage while loading (in MiB)",
@ -1495,6 +1442,71 @@ func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadIn
return predictMemUsage - memUsage, predictDiskUsage - diskUsage, nil
}
// getResourceUsageEstimateOfSegment estimates the resource usage of the segment
func getResourceUsageEstimateOfSegment(schema *schemapb.CollectionSchema, loadInfo *querypb.SegmentLoadInfo, multiplyFactor resourceEstimateFactor) (usage *ResourceUsage, err error) {
var segmentMemorySize, segmentDiskSize uint64
var mmapFieldCount int
vecFieldID2IndexInfo := make(map[int64]*querypb.FieldIndexInfo)
for _, fieldIndexInfo := range loadInfo.IndexInfos {
if fieldIndexInfo.EnableIndex {
fieldID := fieldIndexInfo.FieldID
vecFieldID2IndexInfo[fieldID] = fieldIndexInfo
}
}
for _, fieldBinlog := range loadInfo.BinlogPaths {
fieldID := fieldBinlog.FieldID
mmapEnabled := common.IsFieldMmapEnabled(schema, fieldID)
if fieldIndexInfo, ok := vecFieldID2IndexInfo[fieldID]; ok {
neededMemSize, neededDiskSize, err := getIndexAttrCache().GetIndexResourceUsage(fieldIndexInfo, multiplyFactor.memoryIndexUsageFactor)
if err != nil {
return nil, errors.Wrapf(err, "failed to get index size collection %d, segment %d, indexBuildID %d",
loadInfo.GetCollectionID(),
loadInfo.GetSegmentID(),
fieldIndexInfo.GetBuildID())
}
if mmapEnabled {
segmentDiskSize += neededMemSize + neededDiskSize
} else {
segmentMemorySize += neededMemSize
segmentDiskSize += neededDiskSize
}
} else {
binlogSize := uint64(getBinlogDataSize(fieldBinlog))
if mmapEnabled {
segmentDiskSize += binlogSize
} else {
segmentMemorySize += binlogSize
if multiplyFactor.enableTempSegmentIndex {
segmentMemorySize += uint64(float64(binlogSize) * multiplyFactor.tempSegmentIndexFactor)
}
}
}
if mmapEnabled {
mmapFieldCount++
}
}
// get size of stats data
for _, fieldBinlog := range loadInfo.Statslogs {
segmentMemorySize += uint64(getBinlogDataSize(fieldBinlog))
}
// binlog & statslog use general load factor
segmentMemorySize = uint64(float64(segmentMemorySize) * multiplyFactor.memoryUsageFactor)
// get size of delete data
for _, fieldBinlog := range loadInfo.Deltalogs {
segmentMemorySize += uint64(float64(getBinlogDataSize(fieldBinlog)) * multiplyFactor.deltaDataExpansionFactor)
}
return &ResourceUsage{
MemorySize: segmentMemorySize,
DiskSize: segmentDiskSize,
MmapFieldCount: mmapFieldCount,
}, nil
}
func (loader *segmentLoader) getFieldType(collectionID, fieldID int64) (schemapb.DataType, error) {
collection := loader.manager.Collection.Get(collectionID)
if collection == nil {

View File

@ -63,15 +63,25 @@ func (suite *SegmentSuite) SetupTest() {
suite.sealed, err = NewSegment(ctx,
suite.collection,
suite.segmentID,
suite.partitionID,
suite.collectionID,
"dml",
SegmentTypeSealed,
0,
nil,
nil,
datapb.SegmentLevel_Legacy,
&querypb.SegmentLoadInfo{
CollectionID: suite.collectionID,
SegmentID: suite.segmentID,
PartitionID: suite.partitionID,
InsertChannel: "dml",
Level: datapb.SegmentLevel_Legacy,
BinlogPaths: []*datapb.FieldBinlog{
{
FieldID: 101,
Binlogs: []*datapb.Binlog{
{
LogSize: 10086,
},
},
},
},
},
)
suite.Require().NoError(err)
@ -91,15 +101,15 @@ func (suite *SegmentSuite) SetupTest() {
suite.growing, err = NewSegment(ctx,
suite.collection,
suite.segmentID+1,
suite.partitionID,
suite.collectionID,
"dml",
SegmentTypeGrowing,
0,
nil,
nil,
datapb.SegmentLevel_Legacy,
&querypb.SegmentLoadInfo{
SegmentID: suite.segmentID + 1,
CollectionID: suite.collectionID,
PartitionID: suite.partitionID,
InsertChannel: "dml",
Level: datapb.SegmentLevel_Legacy,
},
)
suite.Require().NoError(err)
@ -122,6 +132,26 @@ func (suite *SegmentSuite) TearDownTest() {
suite.chunkManager.RemoveWithPrefix(ctx, suite.rootPath)
}
func (suite *SegmentSuite) TestLoadInfo() {
// sealed segment has load info
suite.NotNil(suite.sealed.LoadInfo())
// growing segment has no load info
suite.Nil(suite.growing.LoadInfo())
}
func (suite *SegmentSuite) TestResourceUsageEstimate() {
// growing segment has resource usage
// growing segment can not estimate resource usage
usage := suite.growing.ResourceUsageEstimate()
suite.Zero(usage.MemorySize)
suite.Zero(usage.DiskSize)
// growing segment has no resource usage
usage = suite.sealed.ResourceUsageEstimate()
suite.NotZero(usage.MemorySize)
suite.Zero(usage.DiskSize)
suite.Zero(usage.MmapFieldCount)
}
func (suite *SegmentSuite) TestDelete() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

View File

@ -223,14 +223,15 @@ func (suite *QueryNodeSuite) TestStop() {
segment, err := segments.NewSegment(
context.Background(),
collection,
100,
10,
1,
"test_stop_channel",
segments.SegmentTypeSealed,
1, nil,
nil,
datapb.SegmentLevel_Legacy,
1,
&querypb.SegmentLoadInfo{
SegmentID: 100,
PartitionID: 10,
CollectionID: 1,
InsertChannel: "test_stop_channel",
Level: datapb.SegmentLevel_Legacy,
},
)
suite.NoError(err)
suite.node.manager.Segment.Put(segments.SegmentTypeSealed, segment)