From 8e293dc1ced4d2802c371bf25528b03150ce821b Mon Sep 17 00:00:00 2001 From: chyezh Date: Tue, 19 Mar 2024 11:53:05 +0800 Subject: [PATCH] enhance: add resource usage estimate for segment interface (#31050) issue: #30931 - move resource estimate function outside from segment loader. - add load info and collection to base segment. - add resource usage method for sealed segment. Signed-off-by: chyezh --- .../querynodev2/delegator/delegator_data.go | 18 +- .../querynodev2/segments/index_attr_cache.go | 12 +- .../segments/index_attr_cache_test.go | 10 +- internal/querynodev2/segments/manager_test.go | 14 +- internal/querynodev2/segments/mock_segment.go | 82 +++---- internal/querynodev2/segments/plan_test.go | 2 + internal/querynodev2/segments/reduce_test.go | 14 +- .../querynodev2/segments/retrieve_test.go | 28 +-- internal/querynodev2/segments/search_test.go | 28 +-- internal/querynodev2/segments/segment.go | 189 ++++++++------- .../querynodev2/segments/segment_interface.go | 15 +- internal/querynodev2/segments/segment_l0.go | 19 +- .../querynodev2/segments/segment_loader.go | 224 +++++++++--------- internal/querynodev2/segments/segment_test.go | 58 +++-- internal/querynodev2/server_test.go | 15 +- 15 files changed, 399 insertions(+), 329 deletions(-) diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index 6a9778cf96..9a4871cb2d 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -87,18 +87,22 @@ func (sd *shardDelegator) ProcessInsert(insertRecords map[int64]*InsertData) { growing := sd.segmentManager.GetGrowing(segmentID) if growing == nil { var err error + // TODO: It's a wired implementation that growing segment have load info. + // we should separate the growing segment and sealed segment by type system. growing, err = segments.NewSegment( context.Background(), sd.collection, - segmentID, - insertData.PartitionID, - sd.collectionID, - sd.vchannelName, segments.SegmentTypeGrowing, 0, - insertData.StartPosition, - insertData.StartPosition, - datapb.SegmentLevel_L1, + &querypb.SegmentLoadInfo{ + SegmentID: segmentID, + PartitionID: insertData.PartitionID, + CollectionID: sd.collectionID, + InsertChannel: sd.vchannelName, + StartPosition: insertData.StartPosition, + DeltaPosition: insertData.StartPosition, + Level: datapb.SegmentLevel_L1, + }, ) if err != nil { log.Error("failed to create new segment", diff --git a/internal/querynodev2/segments/index_attr_cache.go b/internal/querynodev2/segments/index_attr_cache.go index df0a1667ce..2e5d95b380 100644 --- a/internal/querynodev2/segments/index_attr_cache.go +++ b/internal/querynodev2/segments/index_attr_cache.go @@ -32,10 +32,16 @@ import ( "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/indexparamcheck" - "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) +var indexAttrCache = NewIndexAttrCache() + +// getIndexAttrCache use a singleton to store index meta cache. +func getIndexAttrCache() *IndexAttrCache { + return indexAttrCache +} + // IndexAttrCache index meta cache stores calculated attribute. type IndexAttrCache struct { loadWithDisk *typeutil.ConcurrentMap[typeutil.Pair[string, int32], bool] @@ -48,7 +54,7 @@ func NewIndexAttrCache() *IndexAttrCache { } } -func (c *IndexAttrCache) GetIndexResourceUsage(indexInfo *querypb.FieldIndexInfo) (memory uint64, disk uint64, err error) { +func (c *IndexAttrCache) GetIndexResourceUsage(indexInfo *querypb.FieldIndexInfo, memoryIndexLoadPredictMemoryUsageFactor float64) (memory uint64, disk uint64, err error) { indexType, err := funcutil.GetAttrByKeyFromRepeatedKV(common.IndexTypeKey, indexInfo.IndexParams) if err != nil { return 0, 0, fmt.Errorf("index type not exist in index params") @@ -79,7 +85,7 @@ func (c *IndexAttrCache) GetIndexResourceUsage(indexInfo *querypb.FieldIndexInfo factor := float64(1) diskUsage := uint64(0) if !isLoadWithDisk { - factor = paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat() + factor = memoryIndexLoadPredictMemoryUsageFactor } else { diskUsage = uint64(indexInfo.IndexSize) } diff --git a/internal/querynodev2/segments/index_attr_cache_test.go b/internal/querynodev2/segments/index_attr_cache_test.go index 9ded2e0377..c8705cfec9 100644 --- a/internal/querynodev2/segments/index_attr_cache_test.go +++ b/internal/querynodev2/segments/index_attr_cache_test.go @@ -51,7 +51,7 @@ func (s *IndexAttrCacheSuite) TestCacheMissing() { CurrentIndexVersion: 0, } - _, _, err := s.c.GetIndexResourceUsage(info) + _, _, err := s.c.GetIndexResourceUsage(info, paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat()) s.Require().NoError(err) _, has := s.c.loadWithDisk.Get(typeutil.NewPair[string, int32]("test", 0)) @@ -67,7 +67,7 @@ func (s *IndexAttrCacheSuite) TestDiskANN() { IndexSize: 100, } - memory, disk, err := s.c.GetIndexResourceUsage(info) + memory, disk, err := s.c.GetIndexResourceUsage(info, paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat()) s.Require().NoError(err) _, has := s.c.loadWithDisk.Get(typeutil.NewPair[string, int32](indexparamcheck.IndexDISKANN, 0)) @@ -88,7 +88,7 @@ func (s *IndexAttrCacheSuite) TestLoadWithDisk() { s.Run("load_with_disk", func() { s.c.loadWithDisk.Insert(typeutil.NewPair[string, int32]("test", 0), true) - memory, disk, err := s.c.GetIndexResourceUsage(info) + memory, disk, err := s.c.GetIndexResourceUsage(info, paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat()) s.Require().NoError(err) s.EqualValues(100, memory) @@ -97,7 +97,7 @@ func (s *IndexAttrCacheSuite) TestLoadWithDisk() { s.Run("load_with_disk", func() { s.c.loadWithDisk.Insert(typeutil.NewPair[string, int32]("test", 0), false) - memory, disk, err := s.c.GetIndexResourceUsage(info) + memory, disk, err := s.c.GetIndexResourceUsage(info, paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat()) s.Require().NoError(err) s.Equal(uint64(250), memory) @@ -109,7 +109,7 @@ func (s *IndexAttrCacheSuite) TestLoadWithDisk() { IndexParams: []*commonpb.KeyValuePair{}, } - _, _, err := s.c.GetIndexResourceUsage(info) + _, _, err := s.c.GetIndexResourceUsage(info, paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat()) s.Error(err) }) } diff --git a/internal/querynodev2/segments/manager_test.go b/internal/querynodev2/segments/manager_test.go index 982a3b4389..c046d78ad4 100644 --- a/internal/querynodev2/segments/manager_test.go +++ b/internal/querynodev2/segments/manager_test.go @@ -47,15 +47,15 @@ func (s *ManagerSuite) SetupTest() { segment, err := NewSegment( context.Background(), NewCollection(s.collectionIDs[i], schema, GenTestIndexMeta(s.collectionIDs[i], schema), querypb.LoadType_LoadCollection), - id, - s.partitionIDs[i], - s.collectionIDs[i], - s.channels[i], s.types[i], 0, - nil, - nil, - s.levels[i], + &querypb.SegmentLoadInfo{ + SegmentID: id, + PartitionID: s.partitionIDs[i], + CollectionID: s.collectionIDs[i], + InsertChannel: s.channels[i], + Level: s.levels[i], + }, ) s.Require().NoError(err) s.segments = append(s.segments, segment) diff --git a/internal/querynodev2/segments/mock_segment.go b/internal/querynodev2/segments/mock_segment.go index c798c1b6cc..b9685fd499 100644 --- a/internal/querynodev2/segments/mock_segment.go +++ b/internal/querynodev2/segments/mock_segment.go @@ -459,47 +459,6 @@ func (_c *MockSegment_InsertCount_Call) RunAndReturn(run func() int64) *MockSegm return _c } -// IsLazyLoad provides a mock function with given fields: -func (_m *MockSegment) IsLazyLoad() bool { - ret := _m.Called() - - var r0 bool - if rf, ok := ret.Get(0).(func() bool); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(bool) - } - - return r0 -} - -// MockSegment_IsLazyLoad_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsLazyLoad' -type MockSegment_IsLazyLoad_Call struct { - *mock.Call -} - -// IsLazyLoad is a helper method to define mock.On call -func (_e *MockSegment_Expecter) IsLazyLoad() *MockSegment_IsLazyLoad_Call { - return &MockSegment_IsLazyLoad_Call{Call: _e.mock.On("IsLazyLoad")} -} - -func (_c *MockSegment_IsLazyLoad_Call) Run(run func()) *MockSegment_IsLazyLoad_Call { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} - -func (_c *MockSegment_IsLazyLoad_Call) Return(_a0 bool) *MockSegment_IsLazyLoad_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *MockSegment_IsLazyLoad_Call) RunAndReturn(run func() bool) *MockSegment_IsLazyLoad_Call { - _c.Call.Return(run) - return _c -} - // LastDeltaTimestamp provides a mock function with given fields: func (_m *MockSegment) LastDeltaTimestamp() uint64 { ret := _m.Called() @@ -952,6 +911,47 @@ func (_c *MockSegment_Release_Call) RunAndReturn(run func(...releaseOption)) *Mo return _c } +// ResourceUsageEstimate provides a mock function with given fields: +func (_m *MockSegment) ResourceUsageEstimate() ResourceUsage { + ret := _m.Called() + + var r0 ResourceUsage + if rf, ok := ret.Get(0).(func() ResourceUsage); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(ResourceUsage) + } + + return r0 +} + +// MockSegment_ResourceUsageEstimate_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ResourceUsageEstimate' +type MockSegment_ResourceUsageEstimate_Call struct { + *mock.Call +} + +// ResourceUsageEstimate is a helper method to define mock.On call +func (_e *MockSegment_Expecter) ResourceUsageEstimate() *MockSegment_ResourceUsageEstimate_Call { + return &MockSegment_ResourceUsageEstimate_Call{Call: _e.mock.On("ResourceUsageEstimate")} +} + +func (_c *MockSegment_ResourceUsageEstimate_Call) Run(run func()) *MockSegment_ResourceUsageEstimate_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockSegment_ResourceUsageEstimate_Call) Return(_a0 ResourceUsage) *MockSegment_ResourceUsageEstimate_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockSegment_ResourceUsageEstimate_Call) RunAndReturn(run func() ResourceUsage) *MockSegment_ResourceUsageEstimate_Call { + _c.Call.Return(run) + return _c +} + // Retrieve provides a mock function with given fields: ctx, plan func (_m *MockSegment) Retrieve(ctx context.Context, plan *RetrievePlan) (*segcorepb.RetrieveResults, error) { ret := _m.Called(ctx, plan) diff --git a/internal/querynodev2/segments/plan_test.go b/internal/querynodev2/segments/plan_test.go index 9de3275eab..d3e9ed178a 100644 --- a/internal/querynodev2/segments/plan_test.go +++ b/internal/querynodev2/segments/plan_test.go @@ -26,6 +26,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/proto/planpb" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) type PlanSuite struct { @@ -78,5 +79,6 @@ func (suite *PlanSuite) TestQueryPlanCollectionReleased() { } func TestPlan(t *testing.T) { + paramtable.Init() suite.Run(t, new(PlanSuite)) } diff --git a/internal/querynodev2/segments/reduce_test.go b/internal/querynodev2/segments/reduce_test.go index 9ba6fa182b..0086965715 100644 --- a/internal/querynodev2/segments/reduce_test.go +++ b/internal/querynodev2/segments/reduce_test.go @@ -75,15 +75,15 @@ func (suite *ReduceSuite) SetupTest() { ) suite.segment, err = NewSegment(ctx, suite.collection, - suite.segmentID, - suite.partitionID, - suite.collectionID, - "dml", SegmentTypeSealed, 0, - nil, - nil, - datapb.SegmentLevel_Legacy, + &querypb.SegmentLoadInfo{ + SegmentID: suite.segmentID, + CollectionID: suite.collectionID, + PartitionID: suite.partitionID, + InsertChannel: "dml", + Level: datapb.SegmentLevel_Legacy, + }, ) suite.Require().NoError(err) diff --git a/internal/querynodev2/segments/retrieve_test.go b/internal/querynodev2/segments/retrieve_test.go index 0c8e2803ab..02a0eae1bf 100644 --- a/internal/querynodev2/segments/retrieve_test.go +++ b/internal/querynodev2/segments/retrieve_test.go @@ -85,15 +85,15 @@ func (suite *RetrieveSuite) SetupTest() { suite.sealed, err = NewSegment(ctx, suite.collection, - suite.segmentID, - suite.partitionID, - suite.collectionID, - "dml", SegmentTypeSealed, 0, - nil, - nil, - datapb.SegmentLevel_Legacy, + &querypb.SegmentLoadInfo{ + SegmentID: suite.segmentID, + CollectionID: suite.collectionID, + PartitionID: suite.partitionID, + InsertChannel: "dml", + Level: datapb.SegmentLevel_Legacy, + }, ) suite.Require().NoError(err) @@ -113,15 +113,15 @@ func (suite *RetrieveSuite) SetupTest() { suite.growing, err = NewSegment(ctx, suite.collection, - suite.segmentID+1, - suite.partitionID, - suite.collectionID, - "dml", SegmentTypeGrowing, 0, - nil, - nil, - datapb.SegmentLevel_Legacy, + &querypb.SegmentLoadInfo{ + SegmentID: suite.segmentID + 1, + CollectionID: suite.collectionID, + PartitionID: suite.partitionID, + InsertChannel: "dml", + Level: datapb.SegmentLevel_Legacy, + }, ) suite.Require().NoError(err) diff --git a/internal/querynodev2/segments/search_test.go b/internal/querynodev2/segments/search_test.go index 40ddddb34f..dd0e14ee4a 100644 --- a/internal/querynodev2/segments/search_test.go +++ b/internal/querynodev2/segments/search_test.go @@ -76,15 +76,15 @@ func (suite *SearchSuite) SetupTest() { suite.sealed, err = NewSegment(ctx, suite.collection, - suite.segmentID, - suite.partitionID, - suite.collectionID, - "dml", SegmentTypeSealed, 0, - nil, - nil, - datapb.SegmentLevel_Legacy, + &querypb.SegmentLoadInfo{ + SegmentID: suite.segmentID, + CollectionID: suite.collectionID, + PartitionID: suite.partitionID, + InsertChannel: "dml", + Level: datapb.SegmentLevel_Legacy, + }, ) suite.Require().NoError(err) @@ -104,15 +104,15 @@ func (suite *SearchSuite) SetupTest() { suite.growing, err = NewSegment(ctx, suite.collection, - suite.segmentID+1, - suite.partitionID, - suite.collectionID, - "dml", SegmentTypeGrowing, 0, - nil, - nil, - datapb.SegmentLevel_Legacy, + &querypb.SegmentLoadInfo{ + SegmentID: suite.segmentID + 1, + CollectionID: suite.collectionID, + PartitionID: suite.partitionID, + InsertChannel: "dml", + Level: datapb.SegmentLevel_Legacy, + }, ) suite.Require().NoError(err) diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 1275ef1f07..177cee15bf 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -77,67 +77,61 @@ type IndexedFieldInfo struct { } type baseSegment struct { - segmentID int64 - partitionID int64 - shard string - collectionID int64 - typ SegmentType - level datapb.SegmentLevel - version *atomic.Int64 + collection *Collection + version *atomic.Int64 // the load status of the segment, // only transitions below are allowed: // 1. LoadStatusMeta <-> LoadStatusMapped // 2. LoadStatusMeta <-> LoadStatusInMemory loadStatus *atomic.String - isLazyLoad bool - startPosition *msgpb.MsgPosition // for growing segment release + segmentType SegmentType bloomFilterSet *pkoracle.BloomFilterSet loadInfo *querypb.SegmentLoadInfo + + resourceUsageCache *atomic.Pointer[ResourceUsage] } -func newBaseSegment(id, partitionID, collectionID int64, shard string, typ SegmentType, level datapb.SegmentLevel, version int64, startPosition *msgpb.MsgPosition) baseSegment { +func newBaseSegment(collection *Collection, segmentType SegmentType, version int64, loadInfo *querypb.SegmentLoadInfo) baseSegment { return baseSegment{ - segmentID: id, - partitionID: partitionID, - collectionID: collectionID, - shard: shard, - typ: typ, - level: level, + collection: collection, + loadInfo: loadInfo, version: atomic.NewInt64(version), loadStatus: atomic.NewString(string(LoadStatusMeta)), - startPosition: startPosition, - bloomFilterSet: pkoracle.NewBloomFilterSet(id, partitionID, typ), + segmentType: segmentType, + bloomFilterSet: pkoracle.NewBloomFilterSet(loadInfo.GetSegmentID(), loadInfo.GetPartitionID(), segmentType), + + resourceUsageCache: atomic.NewPointer[ResourceUsage](nil), } } // ID returns the identity number. func (s *baseSegment) ID() int64 { - return s.segmentID + return s.loadInfo.GetSegmentID() } func (s *baseSegment) Collection() int64 { - return s.collectionID + return s.loadInfo.GetCollectionID() } func (s *baseSegment) Partition() int64 { - return s.partitionID + return s.loadInfo.GetPartitionID() } func (s *baseSegment) Shard() string { - return s.shard + return s.loadInfo.GetInsertChannel() } func (s *baseSegment) Type() SegmentType { - return s.typ + return s.segmentType } func (s *baseSegment) Level() datapb.SegmentLevel { - return s.level + return s.loadInfo.GetLevel() } func (s *baseSegment) StartPosition() *msgpb.MsgPosition { - return s.startPosition + return s.loadInfo.GetStartPosition() } func (s *baseSegment) Version() int64 { @@ -152,11 +146,11 @@ func (s *baseSegment) LoadStatus() LoadStatus { return LoadStatus(s.loadStatus.Load()) } -func (s *baseSegment) IsLazyLoad() bool { - return s.isLazyLoad -} - func (s *baseSegment) LoadInfo() *querypb.SegmentLoadInfo { + if s.segmentType == SegmentTypeGrowing { + // Growing segment do not have load info. + return nil + } return s.loadInfo } @@ -171,6 +165,32 @@ func (s *baseSegment) MayPkExist(pk storage.PrimaryKey) bool { return s.bloomFilterSet.MayPkExist(pk) } +// ResourceUsageEstimate returns the estimated resource usage of the segment. +func (s *baseSegment) ResourceUsageEstimate() ResourceUsage { + if s.segmentType == SegmentTypeGrowing { + // Growing segment cannot do resource usage estimate. + return ResourceUsage{} + } + cache := s.resourceUsageCache.Load() + if cache != nil { + return *cache + } + + usage, err := getResourceUsageEstimateOfSegment(s.collection.Schema(), s.loadInfo, resourceEstimateFactor{ + memoryUsageFactor: 1.0, + memoryIndexUsageFactor: 1.0, + enableTempSegmentIndex: false, + deltaDataExpansionFactor: paramtable.Get().QueryNodeCfg.DeltaDataExpansionRate.GetAsFloat(), + }) + if err != nil { + // Should never failure, if failed, segment should never be loaded. + log.Warn("unreachable: failed to get resource usage estimate of segment", zap.Error(err), zap.Int64("collectionID", s.Collection()), zap.Int64("segmentID", s.ID())) + return ResourceUsage{} + } + s.resourceUsageCache.Store(usage) + return *usage +} + type FieldInfo struct { datapb.FieldBinlog RowCount int64 @@ -197,23 +217,17 @@ type LocalSegment struct { func NewSegment(ctx context.Context, collection *Collection, - segmentID int64, - partitionID int64, - collectionID int64, - shard string, segmentType SegmentType, version int64, - startPosition *msgpb.MsgPosition, - deltaPosition *msgpb.MsgPosition, - level datapb.SegmentLevel, + loadInfo *querypb.SegmentLoadInfo, ) (Segment, error) { log := log.Ctx(ctx) /* CStatus NewSegment(CCollection collection, uint64_t segment_id, SegmentType seg_type, CSegmentInterface* newSegment); */ - if level == datapb.SegmentLevel_L0 { - return NewL0Segment(collection, segmentID, partitionID, collectionID, shard, segmentType, version, startPosition, deltaPosition) + if loadInfo.GetLevel() == datapb.SegmentLevel_L0 { + return NewL0Segment(collection, segmentType, version, loadInfo) } var cSegType C.SegmentType switch segmentType { @@ -222,16 +236,16 @@ func NewSegment(ctx context.Context, case SegmentTypeGrowing: cSegType = C.Growing default: - return nil, fmt.Errorf("illegal segment type %d when create segment %d", segmentType, segmentID) + return nil, fmt.Errorf("illegal segment type %d when create segment %d", segmentType, loadInfo.GetSegmentID()) } var newPtr C.CSegmentInterface _, err := GetDynamicPool().Submit(func() (any, error) { - status := C.NewSegment(collection.collectionPtr, cSegType, C.int64_t(segmentID), &newPtr) + status := C.NewSegment(collection.collectionPtr, cSegType, C.int64_t(loadInfo.GetSegmentID()), &newPtr) err := HandleCStatus(ctx, &status, "NewSegmentFailed", - zap.Int64("collectionID", collectionID), - zap.Int64("partitionID", partitionID), - zap.Int64("segmentID", segmentID), + zap.Int64("collectionID", loadInfo.GetCollectionID()), + zap.Int64("partitionID", loadInfo.GetPartitionID()), + zap.Int64("segmentID", loadInfo.GetSegmentID()), zap.String("segmentType", segmentType.String())) return nil, err }).Await() @@ -240,15 +254,15 @@ func NewSegment(ctx context.Context, } log.Info("create segment", - zap.Int64("collectionID", collectionID), - zap.Int64("partitionID", partitionID), - zap.Int64("segmentID", segmentID), + zap.Int64("collectionID", loadInfo.GetCollectionID()), + zap.Int64("partitionID", loadInfo.GetPartitionID()), + zap.Int64("segmentID", loadInfo.GetSegmentID()), zap.String("segmentType", segmentType.String()), - zap.String("level", level.String()), + zap.String("level", loadInfo.GetLevel().String()), ) segment := &LocalSegment{ - baseSegment: newBaseSegment(segmentID, partitionID, collectionID, shard, segmentType, level, version, startPosition), + baseSegment: newBaseSegment(collection, segmentType, version, loadInfo), ptr: newPtr, lastDeltaTimestamp: atomic.NewUint64(0), fields: typeutil.NewConcurrentMap[int64, *FieldInfo](), @@ -269,33 +283,26 @@ func NewSegment(ctx context.Context, func NewSegmentV2( ctx context.Context, collection *Collection, - segmentID int64, - partitionID int64, - collectionID int64, - shard string, segmentType SegmentType, version int64, - startPosition *msgpb.MsgPosition, - deltaPosition *msgpb.MsgPosition, - storageVersion int64, - level datapb.SegmentLevel, + loadInfo *querypb.SegmentLoadInfo, ) (Segment, error) { /* CSegmentInterface NewSegment(CCollection collection, uint64_t segment_id, SegmentType seg_type); */ - if level == datapb.SegmentLevel_L0 { - return NewL0Segment(collection, segmentID, partitionID, collectionID, shard, segmentType, version, startPosition, deltaPosition) + if loadInfo.GetLevel() == datapb.SegmentLevel_L0 { + return NewL0Segment(collection, segmentType, version, loadInfo) } var segmentPtr C.CSegmentInterface var status C.CStatus switch segmentType { case SegmentTypeSealed: - status = C.NewSegment(collection.collectionPtr, C.Sealed, C.int64_t(segmentID), &segmentPtr) + status = C.NewSegment(collection.collectionPtr, C.Sealed, C.int64_t(loadInfo.GetSegmentID()), &segmentPtr) case SegmentTypeGrowing: - status = C.NewSegment(collection.collectionPtr, C.Growing, C.int64_t(segmentID), &segmentPtr) + status = C.NewSegment(collection.collectionPtr, C.Growing, C.int64_t(loadInfo.GetSegmentID()), &segmentPtr) default: - return nil, fmt.Errorf("illegal segment type %d when create segment %d", segmentType, segmentID) + return nil, fmt.Errorf("illegal segment type %d when create segment %d", segmentType, loadInfo.GetSegmentID()) } if err := HandleCStatus(ctx, &status, "NewSegmentFailed"); err != nil { @@ -303,22 +310,22 @@ func NewSegmentV2( } log.Info("create segment", - zap.Int64("collectionID", collectionID), - zap.Int64("partitionID", partitionID), - zap.Int64("segmentID", segmentID), + zap.Int64("collectionID", loadInfo.GetCollectionID()), + zap.Int64("partitionID", loadInfo.GetPartitionID()), + zap.Int64("segmentID", loadInfo.GetSegmentID()), zap.String("segmentType", segmentType.String())) - url, err := typeutil_internal.GetStorageURI(paramtable.Get().CommonCfg.StorageScheme.GetValue(), paramtable.Get().CommonCfg.StoragePathPrefix.GetValue(), segmentID) + url, err := typeutil_internal.GetStorageURI(paramtable.Get().CommonCfg.StorageScheme.GetValue(), paramtable.Get().CommonCfg.StoragePathPrefix.GetValue(), loadInfo.GetSegmentID()) if err != nil { return nil, err } - space, err := milvus_storage.Open(url, options.NewSpaceOptionBuilder().SetVersion(storageVersion).Build()) + space, err := milvus_storage.Open(url, options.NewSpaceOptionBuilder().SetVersion(loadInfo.GetStorageVersion()).Build()) if err != nil { return nil, err } segment := &LocalSegment{ - baseSegment: newBaseSegment(segmentID, partitionID, collectionID, shard, segmentType, level, version, startPosition), + baseSegment: newBaseSegment(collection, segmentType, version, loadInfo), ptr: segmentPtr, lastDeltaTimestamp: atomic.NewUint64(0), fields: typeutil.NewConcurrentMap[int64, *FieldInfo](), @@ -461,13 +468,13 @@ func (s *LocalSegment) Search(ctx context.Context, searchReq *SearchRequest) (*S log := log.Ctx(ctx).With( zap.Int64("collectionID", s.Collection()), zap.Int64("segmentID", s.ID()), - zap.String("segmentType", s.typ.String()), + zap.String("segmentType", s.segmentType.String()), ) s.ptrLock.RLock() defer s.ptrLock.RUnlock() if s.ptr == nil { - return nil, merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released") + return nil, merr.WrapErrSegmentNotLoaded(s.ID(), "segment released") } traceCtx := ParseCTraceContext(ctx) @@ -493,7 +500,7 @@ func (s *LocalSegment) Search(ctx context.Context, searchReq *SearchRequest) (*S if err := HandleCStatus(ctx, &status, "Search failed", zap.Int64("collectionID", s.Collection()), zap.Int64("segmentID", s.ID()), - zap.String("segmentType", s.typ.String())); err != nil { + zap.String("segmentType", s.segmentType.String())); err != nil { return nil, err } log.Debug("search segment done") @@ -505,7 +512,7 @@ func (s *LocalSegment) Retrieve(ctx context.Context, plan *RetrievePlan) (*segco defer s.ptrLock.RUnlock() if s.ptr == nil { - return nil, merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released") + return nil, merr.WrapErrSegmentNotLoaded(s.ID(), "segment released") } log := log.Ctx(ctx).With( @@ -513,7 +520,7 @@ func (s *LocalSegment) Retrieve(ctx context.Context, plan *RetrievePlan) (*segco zap.Int64("partitionID", s.Partition()), zap.Int64("segmentID", s.ID()), zap.Int64("msgID", plan.msgID), - zap.String("segmentType", s.typ.String()), + zap.String("segmentType", s.segmentType.String()), ) traceCtx := ParseCTraceContext(ctx) @@ -542,7 +549,7 @@ func (s *LocalSegment) Retrieve(ctx context.Context, plan *RetrievePlan) (*segco zap.Int64("partitionID", s.Partition()), zap.Int64("segmentID", s.ID()), zap.Int64("msgID", plan.msgID), - zap.String("segmentType", s.typ.String())); err != nil { + zap.String("segmentType", s.segmentType.String())); err != nil { return nil, err } @@ -595,14 +602,14 @@ func (s *LocalSegment) preInsert(ctx context.Context, numOfRecords int) (int64, func (s *LocalSegment) Insert(ctx context.Context, rowIDs []int64, timestamps []typeutil.Timestamp, record *segcorepb.InsertRecord) error { if s.Type() != SegmentTypeGrowing { - return fmt.Errorf("unexpected segmentType when segmentInsert, segmentType = %s", s.typ.String()) + return fmt.Errorf("unexpected segmentType when segmentInsert, segmentType = %s", s.segmentType.String()) } s.ptrLock.RLock() defer s.ptrLock.RUnlock() if s.ptr == nil { - return merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released") + return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released") } offset, err := s.preInsert(ctx, len(rowIDs)) @@ -662,7 +669,7 @@ func (s *LocalSegment) Delete(ctx context.Context, primaryKeys []storage.Primary defer s.ptrLock.RUnlock() if s.ptr == nil { - return merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released") + return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released") } cOffset := C.int64_t(0) // depre @@ -728,7 +735,7 @@ func (s *LocalSegment) LoadMultiFieldData(ctx context.Context, rowCount int64, f defer s.ptrLock.RUnlock() if s.ptr == nil { - return merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released") + return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released") } log := log.Ctx(ctx).With( @@ -763,7 +770,7 @@ func (s *LocalSegment) LoadMultiFieldData(ctx context.Context, rowCount int64, f var status C.CStatus GetLoadPool().Submit(func() (any, error) { if paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool() { - uri, err := typeutil_internal.GetStorageURI(paramtable.Get().CommonCfg.StorageScheme.GetValue(), paramtable.Get().CommonCfg.StoragePathPrefix.GetValue(), s.segmentID) + uri, err := typeutil_internal.GetStorageURI(paramtable.Get().CommonCfg.StorageScheme.GetValue(), paramtable.Get().CommonCfg.StoragePathPrefix.GetValue(), s.ID()) if err != nil { return nil, err } @@ -831,11 +838,11 @@ func (s *LocalSegment) LoadFieldData(ctx context.Context, fieldID int64, rowCoun s.ptrLock.RLock() defer s.ptrLock.RUnlock() - ctx, sp := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, fmt.Sprintf("LoadFieldData-%d-%d", s.segmentID, fieldID)) + ctx, sp := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, fmt.Sprintf("LoadFieldData-%d-%d", s.ID(), fieldID)) defer sp.End() if s.ptr == nil { - return merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released") + return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released") } log := log.Ctx(ctx).With( @@ -875,7 +882,7 @@ func (s *LocalSegment) LoadFieldData(ctx context.Context, fieldID int64, rowCoun GetLoadPool().Submit(func() (any, error) { log.Info("submitted loadFieldData task to load pool") if paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool() { - uri, err := typeutil_internal.GetStorageURI(paramtable.Get().CommonCfg.StorageScheme.GetValue(), paramtable.Get().CommonCfg.StoragePathPrefix.GetValue(), s.segmentID) + uri, err := typeutil_internal.GetStorageURI(paramtable.Get().CommonCfg.StorageScheme.GetValue(), paramtable.Get().CommonCfg.StoragePathPrefix.GetValue(), s.ID()) if err != nil { return nil, err } @@ -996,7 +1003,7 @@ func (s *LocalSegment) AddFieldDataInfo(ctx context.Context, rowCount int64, fie defer s.ptrLock.RUnlock() if s.ptr == nil { - return merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released") + return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released") } log := log.Ctx(ctx).With( @@ -1051,7 +1058,7 @@ func (s *LocalSegment) LoadDeltaData(ctx context.Context, deltaData *storage.Del defer s.ptrLock.RUnlock() if s.ptr == nil { - return merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released") + return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released") } log := log.Ctx(ctx).With( @@ -1137,7 +1144,7 @@ func (s *LocalSegment) LoadIndex(ctx context.Context, indexInfo *querypb.FieldIn return nil } - ctx, sp := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, fmt.Sprintf("LoadIndex-%d-%d", s.segmentID, indexInfo.GetFieldID())) + ctx, sp := otel.Tracer(typeutil.QueryNodeRole).Start(ctx, fmt.Sprintf("LoadIndex-%d-%d", s.ID(), indexInfo.GetFieldID())) defer sp.End() log := log.Ctx(ctx).With( @@ -1155,14 +1162,14 @@ func (s *LocalSegment) LoadIndex(ctx context.Context, indexInfo *querypb.FieldIn defer deleteLoadIndexInfo(loadIndexInfo) if paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool() { - uri, err := typeutil_internal.GetStorageURI(paramtable.Get().CommonCfg.StorageScheme.GetValue(), paramtable.Get().CommonCfg.StoragePathPrefix.GetValue(), s.segmentID) + uri, err := typeutil_internal.GetStorageURI(paramtable.Get().CommonCfg.StorageScheme.GetValue(), paramtable.Get().CommonCfg.StoragePathPrefix.GetValue(), s.ID()) if err != nil { return err } loadIndexInfo.appendStorageInfo(uri, indexInfo.IndexStoreVersion) } - err = loadIndexInfo.appendLoadIndexInfo(ctx, indexInfo, s.collectionID, s.partitionID, s.segmentID, fieldType) + err = loadIndexInfo.appendLoadIndexInfo(ctx, indexInfo, s.Collection(), s.Partition(), s.ID(), fieldType) if err != nil { if loadIndexInfo.cleanLocalData(ctx) != nil { log.Warn("failed to clean cached data on disk after append index failed", @@ -1172,7 +1179,7 @@ func (s *LocalSegment) LoadIndex(ctx context.Context, indexInfo *querypb.FieldIn return err } if s.Type() != SegmentTypeSealed { - errMsg := fmt.Sprintln("updateSegmentIndex failed, illegal segment type ", s.typ, "segmentID = ", s.ID()) + errMsg := fmt.Sprintln("updateSegmentIndex failed, illegal segment type ", s.segmentType, "segmentID = ", s.ID()) return errors.New(errMsg) } @@ -1199,7 +1206,7 @@ func (s *LocalSegment) UpdateIndexInfo(ctx context.Context, indexInfo *querypb.F defer s.ptrLock.RUnlock() if s.ptr == nil { - return merr.WrapErrSegmentNotLoaded(s.segmentID, "segment released") + return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released") } var status C.CStatus @@ -1368,10 +1375,10 @@ func (s *LocalSegment) Release(opts ...releaseOption) { } log.Info("delete segment from memory", - zap.Int64("collectionID", s.collectionID), - zap.Int64("partitionID", s.partitionID), + zap.Int64("collectionID", s.Collection()), + zap.Int64("partitionID", s.Partition()), zap.Int64("segmentID", s.ID()), - zap.String("segmentType", s.typ.String()), + zap.String("segmentType", s.segmentType.String()), zap.Int64("insertCount", s.InsertCount()), ) } diff --git a/internal/querynodev2/segments/segment_interface.go b/internal/querynodev2/segments/segment_interface.go index f8e28e828c..017b3b4d93 100644 --- a/internal/querynodev2/segments/segment_interface.go +++ b/internal/querynodev2/segments/segment_interface.go @@ -35,7 +35,19 @@ const ( LoadStatusInMemory LoadStatus = "in_memory" ) +// ResourceUsage is used to estimate the resource usage of a sealed segment. +type ResourceUsage struct { + MemorySize uint64 + DiskSize uint64 + MmapFieldCount int +} + +// Segment is the interface of a segment implementation. +// Some methods can not apply to all segment types,such as LoadInfo, ResourceUsageEstimate. +// Add more interface to represent different segment types is a better implementation. type Segment interface { + // ResourceUsageEstimate() ResourceUsage + // Properties ID() int64 Collection() int64 @@ -47,7 +59,6 @@ type Segment interface { Type() SegmentType Level() datapb.SegmentLevel LoadStatus() LoadStatus - IsLazyLoad() bool LoadInfo() *querypb.SegmentLoadInfo RLock() error RUnlock() @@ -58,6 +69,8 @@ type Segment interface { // RowNum returns the number of rows, it's slow, so DO NOT call it in a loop RowNum() int64 MemSize() int64 + // ResourceUsageEstimate returns the estimated resource usage of the segment + ResourceUsageEstimate() ResourceUsage // Index related GetIndex(fieldID int64) *IndexedFieldInfo diff --git a/internal/querynodev2/segments/segment_l0.go b/internal/querynodev2/segments/segment_l0.go index ddd6f55e96..e71bc16335 100644 --- a/internal/querynodev2/segments/segment_l0.go +++ b/internal/querynodev2/segments/segment_l0.go @@ -21,8 +21,8 @@ import ( "github.com/samber/lo" "go.uber.org/zap" - "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/segcorepb" storage "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/log" @@ -41,14 +41,9 @@ type L0Segment struct { } func NewL0Segment(collection *Collection, - segmentID int64, - partitionID int64, - collectionID int64, - shard string, segmentType SegmentType, version int64, - startPosition *msgpb.MsgPosition, - deltaPosition *msgpb.MsgPosition, + loadInfo *querypb.SegmentLoadInfo, ) (Segment, error) { /* CSegmentInterface @@ -56,13 +51,13 @@ func NewL0Segment(collection *Collection, */ log.Info("create L0 segment", - zap.Int64("collectionID", collectionID), - zap.Int64("partitionID", partitionID), - zap.Int64("segmentID", segmentID), + zap.Int64("collectionID", loadInfo.GetCollectionID()), + zap.Int64("partitionID", loadInfo.GetPartitionID()), + zap.Int64("segmentID", loadInfo.GetSegmentID()), zap.String("segmentType", segmentType.String())) segment := &L0Segment{ - baseSegment: newBaseSegment(segmentID, partitionID, collectionID, shard, segmentType, datapb.SegmentLevel_L0, version, startPosition), + baseSegment: newBaseSegment(collection, segmentType, version, loadInfo), } // level 0 segments are always in memory @@ -119,7 +114,7 @@ func (s *L0Segment) Indexes() []*IndexedFieldInfo { } func (s *L0Segment) Type() SegmentType { - return s.typ + return s.segmentType } func (s *L0Segment) Level() datapb.SegmentLevel { diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index e6bf7d31b8..7a0b4f4ac1 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -95,6 +95,14 @@ func (r *LoadResource) Sub(resource LoadResource) { r.DiskSize -= resource.DiskSize } +type resourceEstimateFactor struct { + memoryUsageFactor float64 + memoryIndexUsageFactor float64 + enableTempSegmentIndex bool + tempSegmentIndexFactor float64 + deltaDataExpansionFactor float64 +} + type segmentLoaderV2 struct { *segmentLoader } @@ -164,29 +172,26 @@ func (loader *segmentLoaderV2) Load(ctx context.Context, }() for _, info := range infos { - segmentID := info.GetSegmentID() - partitionID := info.GetPartitionID() - collectionID := info.GetCollectionID() - shard := info.GetInsertChannel() + loadInfo := info - collection := loader.manager.Collection.Get(collectionID) + collection := loader.manager.Collection.Get(loadInfo.GetCollectionID()) if collection == nil { - err := merr.WrapErrCollectionNotFound(collectionID) + err := merr.WrapErrCollectionNotFound(loadInfo.GetCollectionID()) log.Warn("failed to get collection", zap.Error(err)) return nil, err } - segment, err := NewSegmentV2(ctx, collection, segmentID, partitionID, collectionID, shard, segmentType, version, info.GetStartPosition(), info.GetDeltaPosition(), info.GetStorageVersion(), info.GetLevel()) + segment, err := NewSegmentV2(ctx, collection, segmentType, version, loadInfo) if err != nil { log.Warn("load segment failed when create new segment", - zap.Int64("partitionID", partitionID), - zap.Int64("segmentID", segmentID), + zap.Int64("partitionID", loadInfo.GetPartitionID()), + zap.Int64("segmentID", loadInfo.GetSegmentID()), zap.Error(err), ) return nil, err } - newSegments.Insert(segmentID, segment) + newSegments.Insert(loadInfo.GetSegmentID(), segment) } loadSegmentFunc := func(idx int) error { @@ -439,10 +444,10 @@ func (loader *segmentLoaderV2) loadSegment(ctx context.Context, } // load statslog if it's growing segment - if segment.typ == SegmentTypeGrowing { + if segment.segmentType == SegmentTypeGrowing { log.Info("loading statslog...") // pkStatsBinlogs, logType := loader.filterPKStatsBinlogs(loadInfo.Statslogs, pkField.GetFieldID()) - err := loader.loadBloomFilter(ctx, segment.segmentID, segment.bloomFilterSet, loadInfo.StorageVersion) + err := loader.loadBloomFilter(ctx, segment.ID(), segment.bloomFilterSet, loadInfo.StorageVersion) if err != nil { return err } @@ -467,8 +472,8 @@ func (loader *segmentLoaderV2) loadSealedSegmentFields(ctx context.Context, segm } log.Ctx(ctx).Info("load field binlogs done for sealed segment", - zap.Int64("collection", segment.collectionID), - zap.Int64("segment", segment.segmentID), + zap.Int64("collection", segment.Collection()), + zap.Int64("segment", segment.ID()), zap.String("segmentType", segment.Type().String())) return nil @@ -496,7 +501,6 @@ func NewLoader( log.Info("SegmentLoader created", zap.Int("ioPoolSize", ioPoolSize)) loader := &segmentLoader{ - IndexAttrCache: NewIndexAttrCache(), manager: manager, cm: cm, loadingSegments: typeutil.NewConcurrentMap[int64, *loadResult](), @@ -532,7 +536,6 @@ func (r *loadResult) SetResult(status loadStatus) { // segmentLoader is only responsible for loading the field data from binlog type segmentLoader struct { - *IndexAttrCache manager *Manager cm storage.ChunkManager @@ -594,10 +597,7 @@ func (loader *segmentLoader) Load(ctx context.Context, }() for _, info := range infos { - segmentID := info.GetSegmentID() - partitionID := info.GetPartitionID() - collectionID := info.GetCollectionID() - shard := info.GetInsertChannel() + loadInfo := info collection := loader.manager.Collection.Get(collectionID) if collection == nil { @@ -609,26 +609,20 @@ func (loader *segmentLoader) Load(ctx context.Context, segment, err := NewSegment( ctx, collection, - segmentID, - partitionID, - collectionID, - shard, segmentType, version, - info.GetStartPosition(), - info.GetDeltaPosition(), - info.GetLevel(), + loadInfo, ) if err != nil { log.Warn("load segment failed when create new segment", - zap.Int64("partitionID", partitionID), - zap.Int64("segmentID", segmentID), + zap.Int64("partitionID", loadInfo.GetPartitionID()), + zap.Int64("segmentID", loadInfo.GetSegmentID()), zap.Error(err), ) return nil, err } - newSegments.Insert(segmentID, segment) + newSegments.Insert(loadInfo.GetSegmentID(), segment) } loadSegmentFunc := func(idx int) error { @@ -1024,10 +1018,10 @@ func (loader *segmentLoader) loadSegment(ctx context.Context, } // load statslog if it's growing segment - if segment.typ == SegmentTypeGrowing { + if segment.segmentType == SegmentTypeGrowing { log.Info("loading statslog...") pkStatsBinlogs, logType := loader.filterPKStatsBinlogs(loadInfo.Statslogs, pkField.GetFieldID()) - err := loader.loadBloomFilter(ctx, segment.segmentID, segment.bloomFilterSet, pkStatsBinlogs, logType) + err := loader.loadBloomFilter(ctx, segment.ID(), segment.bloomFilterSet, pkStatsBinlogs, logType) if err != nil { return err } @@ -1095,8 +1089,8 @@ func loadSealedSegmentFields(ctx context.Context, collection *Collection, segmen } log.Ctx(ctx).Info("load field binlogs done for sealed segment", - zap.Int64("collection", segment.collectionID), - zap.Int64("segment", segment.segmentID), + zap.Int64("collection", segment.Collection()), + zap.Int64("segment", segment.ID()), zap.Int("len(field)", len(fields)), zap.String("segmentType", segment.Type().String())) @@ -1290,7 +1284,7 @@ func (loader *segmentLoader) patchEntryNumber(ctx context.Context, segment *Loca return nil } - log.Warn("legacy segment binlog found, start to patch entry num", zap.Int64("segmentID", segment.segmentID)) + log.Warn("legacy segment binlog found, start to patch entry num", zap.Int64("segmentID", segment.ID())) rowIDField := lo.FindOrElse(loadInfo.BinlogPaths, nil, func(binlog *datapb.FieldBinlog) bool { return binlog.GetFieldID() == common.RowIDField }) @@ -1379,88 +1373,41 @@ func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadIn metrics.QueryNodeDiskUsedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(toMB(uint64(localDiskUsage))) diskUsage := uint64(localDiskUsage) + loader.committedResource.DiskSize - memoryUsageFactor := paramtable.Get().QueryNodeCfg.LoadMemoryUsageFactor.GetAsFloat() + factor := resourceEstimateFactor{ + memoryUsageFactor: paramtable.Get().QueryNodeCfg.LoadMemoryUsageFactor.GetAsFloat(), + memoryIndexUsageFactor: paramtable.Get().QueryNodeCfg.MemoryIndexLoadPredictMemoryUsageFactor.GetAsFloat(), + enableTempSegmentIndex: paramtable.Get().QueryNodeCfg.EnableTempSegmentIndex.GetAsBool(), + tempSegmentIndexFactor: paramtable.Get().QueryNodeCfg.InterimIndexMemExpandRate.GetAsFloat(), + deltaDataExpansionFactor: paramtable.Get().QueryNodeCfg.DeltaDataExpansionRate.GetAsFloat(), + } maxSegmentSize := uint64(0) predictMemUsage := memUsage predictDiskUsage := diskUsage mmapFieldCount := 0 for _, loadInfo := range segmentLoadInfos { - var segmentMemorySize, segmentDiskSize uint64 collection := loader.manager.Collection.Get(loadInfo.GetCollectionID()) - - vecFieldID2IndexInfo := make(map[int64]*querypb.FieldIndexInfo) - for _, fieldIndexInfo := range loadInfo.IndexInfos { - if fieldIndexInfo.EnableIndex { - fieldID := fieldIndexInfo.FieldID - vecFieldID2IndexInfo[fieldID] = fieldIndexInfo - } + usage, err := getResourceUsageEstimateOfSegment(collection.Schema(), loadInfo, factor) + if err != nil { + log.Warn( + "failed to estimate resource usage of segment", + zap.Int64("collectionID", loadInfo.GetCollectionID()), + zap.Int64("segmentID", loadInfo.GetSegmentID()), + zap.Error(err)) + return 0, 0, err } - for _, fieldBinlog := range loadInfo.BinlogPaths { - fieldID := fieldBinlog.FieldID - mmapEnabled := common.IsFieldMmapEnabled(collection.Schema(), fieldID) - if fieldIndexInfo, ok := vecFieldID2IndexInfo[fieldID]; ok { - neededMemSize, neededDiskSize, err := loader.GetIndexResourceUsage(fieldIndexInfo) - if err != nil { - log.Warn("failed to get index size", - zap.Int64("collectionID", loadInfo.CollectionID), - zap.Int64("segmentID", loadInfo.SegmentID), - zap.Int64("indexBuildID", fieldIndexInfo.BuildID), - zap.Error(err), - ) - return 0, 0, err - } - if mmapEnabled { - segmentDiskSize += neededMemSize + neededDiskSize - } else { - segmentMemorySize += neededMemSize - segmentDiskSize += neededDiskSize - } - } else { - binlogSize := uint64(getBinlogDataSize(fieldBinlog)) - if mmapEnabled { - segmentDiskSize += binlogSize - } else { - segmentMemorySize += binlogSize - enableBinlogIndex := paramtable.Get().QueryNodeCfg.EnableTempSegmentIndex.GetAsBool() - if enableBinlogIndex { - buildBinlogIndexRate := paramtable.Get().QueryNodeCfg.InterimIndexMemExpandRate.GetAsFloat() - segmentMemorySize += uint64(float64(binlogSize) * buildBinlogIndexRate) - } - } - } - - if mmapEnabled { - mmapFieldCount++ - } - } - - // get size of stats data - for _, fieldBinlog := range loadInfo.Statslogs { - segmentMemorySize += uint64(getBinlogDataSize(fieldBinlog)) - } - - // binlog & statslog use general load factor - segmentMemorySize = uint64(float64(segmentMemorySize) * memoryUsageFactor) - - // get size of delete data - for _, fieldBinlog := range loadInfo.Deltalogs { - segmentMemorySize += uint64(float64(getBinlogDataSize(fieldBinlog)) * paramtable.Get().QueryNodeCfg.DeltaDataExpansionRate.GetAsFloat()) - } - - if segmentMemorySize > maxSegmentSize { - maxSegmentSize = segmentMemorySize - } - - predictMemUsage += segmentMemorySize - predictDiskUsage += segmentDiskSize - log.Debug("segment resource for loading", zap.Int64("segmentID", loadInfo.GetSegmentID()), - zap.Float64("memoryUsage(MB)", toMB(segmentMemorySize)), - zap.Float64("diskUsage(MB)", toMB(segmentDiskSize)), - zap.Float64("memoryLoadFactor", memoryUsageFactor), + zap.Float64("memoryUsage(MB)", toMB(usage.MemorySize)), + zap.Float64("diskUsage(MB)", toMB(usage.DiskSize)), + zap.Float64("memoryLoadFactor", factor.memoryUsageFactor), ) + mmapFieldCount += usage.MmapFieldCount + predictDiskUsage += usage.DiskSize + predictMemUsage += usage.MemorySize + if usage.MemorySize > maxSegmentSize { + maxSegmentSize = usage.MemorySize + } } log.Info("predict memory and disk usage while loading (in MiB)", @@ -1495,6 +1442,71 @@ func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadIn return predictMemUsage - memUsage, predictDiskUsage - diskUsage, nil } +// getResourceUsageEstimateOfSegment estimates the resource usage of the segment +func getResourceUsageEstimateOfSegment(schema *schemapb.CollectionSchema, loadInfo *querypb.SegmentLoadInfo, multiplyFactor resourceEstimateFactor) (usage *ResourceUsage, err error) { + var segmentMemorySize, segmentDiskSize uint64 + var mmapFieldCount int + + vecFieldID2IndexInfo := make(map[int64]*querypb.FieldIndexInfo) + for _, fieldIndexInfo := range loadInfo.IndexInfos { + if fieldIndexInfo.EnableIndex { + fieldID := fieldIndexInfo.FieldID + vecFieldID2IndexInfo[fieldID] = fieldIndexInfo + } + } + + for _, fieldBinlog := range loadInfo.BinlogPaths { + fieldID := fieldBinlog.FieldID + mmapEnabled := common.IsFieldMmapEnabled(schema, fieldID) + if fieldIndexInfo, ok := vecFieldID2IndexInfo[fieldID]; ok { + neededMemSize, neededDiskSize, err := getIndexAttrCache().GetIndexResourceUsage(fieldIndexInfo, multiplyFactor.memoryIndexUsageFactor) + if err != nil { + return nil, errors.Wrapf(err, "failed to get index size collection %d, segment %d, indexBuildID %d", + loadInfo.GetCollectionID(), + loadInfo.GetSegmentID(), + fieldIndexInfo.GetBuildID()) + } + if mmapEnabled { + segmentDiskSize += neededMemSize + neededDiskSize + } else { + segmentMemorySize += neededMemSize + segmentDiskSize += neededDiskSize + } + } else { + binlogSize := uint64(getBinlogDataSize(fieldBinlog)) + if mmapEnabled { + segmentDiskSize += binlogSize + } else { + segmentMemorySize += binlogSize + if multiplyFactor.enableTempSegmentIndex { + segmentMemorySize += uint64(float64(binlogSize) * multiplyFactor.tempSegmentIndexFactor) + } + } + } + if mmapEnabled { + mmapFieldCount++ + } + } + + // get size of stats data + for _, fieldBinlog := range loadInfo.Statslogs { + segmentMemorySize += uint64(getBinlogDataSize(fieldBinlog)) + } + + // binlog & statslog use general load factor + segmentMemorySize = uint64(float64(segmentMemorySize) * multiplyFactor.memoryUsageFactor) + + // get size of delete data + for _, fieldBinlog := range loadInfo.Deltalogs { + segmentMemorySize += uint64(float64(getBinlogDataSize(fieldBinlog)) * multiplyFactor.deltaDataExpansionFactor) + } + return &ResourceUsage{ + MemorySize: segmentMemorySize, + DiskSize: segmentDiskSize, + MmapFieldCount: mmapFieldCount, + }, nil +} + func (loader *segmentLoader) getFieldType(collectionID, fieldID int64) (schemapb.DataType, error) { collection := loader.manager.Collection.Get(collectionID) if collection == nil { diff --git a/internal/querynodev2/segments/segment_test.go b/internal/querynodev2/segments/segment_test.go index 3875eb9d82..c03a902857 100644 --- a/internal/querynodev2/segments/segment_test.go +++ b/internal/querynodev2/segments/segment_test.go @@ -63,15 +63,25 @@ func (suite *SegmentSuite) SetupTest() { suite.sealed, err = NewSegment(ctx, suite.collection, - suite.segmentID, - suite.partitionID, - suite.collectionID, - "dml", SegmentTypeSealed, 0, - nil, - nil, - datapb.SegmentLevel_Legacy, + &querypb.SegmentLoadInfo{ + CollectionID: suite.collectionID, + SegmentID: suite.segmentID, + PartitionID: suite.partitionID, + InsertChannel: "dml", + Level: datapb.SegmentLevel_Legacy, + BinlogPaths: []*datapb.FieldBinlog{ + { + FieldID: 101, + Binlogs: []*datapb.Binlog{ + { + LogSize: 10086, + }, + }, + }, + }, + }, ) suite.Require().NoError(err) @@ -91,15 +101,15 @@ func (suite *SegmentSuite) SetupTest() { suite.growing, err = NewSegment(ctx, suite.collection, - suite.segmentID+1, - suite.partitionID, - suite.collectionID, - "dml", SegmentTypeGrowing, 0, - nil, - nil, - datapb.SegmentLevel_Legacy, + &querypb.SegmentLoadInfo{ + SegmentID: suite.segmentID + 1, + CollectionID: suite.collectionID, + PartitionID: suite.partitionID, + InsertChannel: "dml", + Level: datapb.SegmentLevel_Legacy, + }, ) suite.Require().NoError(err) @@ -122,6 +132,26 @@ func (suite *SegmentSuite) TearDownTest() { suite.chunkManager.RemoveWithPrefix(ctx, suite.rootPath) } +func (suite *SegmentSuite) TestLoadInfo() { + // sealed segment has load info + suite.NotNil(suite.sealed.LoadInfo()) + // growing segment has no load info + suite.Nil(suite.growing.LoadInfo()) +} + +func (suite *SegmentSuite) TestResourceUsageEstimate() { + // growing segment has resource usage + // growing segment can not estimate resource usage + usage := suite.growing.ResourceUsageEstimate() + suite.Zero(usage.MemorySize) + suite.Zero(usage.DiskSize) + // growing segment has no resource usage + usage = suite.sealed.ResourceUsageEstimate() + suite.NotZero(usage.MemorySize) + suite.Zero(usage.DiskSize) + suite.Zero(usage.MmapFieldCount) +} + func (suite *SegmentSuite) TestDelete() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/internal/querynodev2/server_test.go b/internal/querynodev2/server_test.go index 088c5f3c36..e83fcb097a 100644 --- a/internal/querynodev2/server_test.go +++ b/internal/querynodev2/server_test.go @@ -223,14 +223,15 @@ func (suite *QueryNodeSuite) TestStop() { segment, err := segments.NewSegment( context.Background(), collection, - 100, - 10, - 1, - "test_stop_channel", segments.SegmentTypeSealed, - 1, nil, - nil, - datapb.SegmentLevel_Legacy, + 1, + &querypb.SegmentLoadInfo{ + SegmentID: 100, + PartitionID: 10, + CollectionID: 1, + InsertChannel: "test_stop_channel", + Level: datapb.SegmentLevel_Legacy, + }, ) suite.NoError(err) suite.node.manager.Segment.Put(segments.SegmentTypeSealed, segment)