From 07bc1b6717e5b675e89ba299f0154f6afc748acd Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Fri, 19 Jul 2024 18:25:50 +0800 Subject: [PATCH] enhance: Seal by total growing segments size (#34692) (#34779) Seals the largest growing segment if the total size of growing segments of each shard exceeds the size threshold(default 4GB). Introducing this policy can help keep the size of growing segments within a suitable level, alleviating the pressure on the delegator. issue: https://github.com/milvus-io/milvus/issues/34554 pr: https://github.com/milvus-io/milvus/pull/34692 --------- Signed-off-by: bigsheeper --- configs/milvus.yaml | 5 + .../datacoord/segment_allocation_policy.go | 36 ++++- .../segment_allocation_policy_test.go | 37 ++++- internal/datacoord/segment_info.go | 2 +- internal/datacoord/segment_manager.go | 20 ++- pkg/util/paramtable/component_param.go | 11 ++ pkg/util/paramtable/component_param_test.go | 1 + .../seal_by_total_growing_test.go | 138 ++++++++++++++++++ .../sealpolicies/seal_policies_test.go | 44 ++++++ 9 files changed, 285 insertions(+), 9 deletions(-) create mode 100644 tests/integration/sealpolicies/seal_by_total_growing_test.go create mode 100644 tests/integration/sealpolicies/seal_policies_test.go diff --git a/configs/milvus.yaml b/configs/milvus.yaml index a6f5420b82..bceded4f6d 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -450,6 +450,11 @@ dataCoord: # During compaction, the size of segment # of rows is able to exceed segment max # of rows by (expansionRate-1) * 100%. expansionRate: 1.25 segmentFlushInterval: 2 # the minimal interval duration(unit: Seconds) between flusing operation on same segment + sealPolicy: + channel: + # The size threshold in MB, if the total size of growing segments of each shard + # exceeds this threshold, the largest growing segment will be sealed. + growingSegmentsMemSize: 4096 autoUpgradeSegmentIndex: false # whether auto upgrade segment index to index engine's version enableCompaction: true # Enable data segment compaction compaction: diff --git a/internal/datacoord/segment_allocation_policy.go b/internal/datacoord/segment_allocation_policy.go index a17a6deff1..2c4f6bc177 100644 --- a/internal/datacoord/segment_allocation_policy.go +++ b/internal/datacoord/segment_allocation_policy.go @@ -22,6 +22,7 @@ import ( "time" "github.com/cockroachdb/errors" + "github.com/samber/lo" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" @@ -169,20 +170,47 @@ func sealL1SegmentByIdleTime(idleTimeTolerance time.Duration, minSizeToSealIdleS } // channelSealPolicy seal policy applies to channel -type channelSealPolicy func(string, []*SegmentInfo, Timestamp) []*SegmentInfo +type channelSealPolicy func(string, []*SegmentInfo, Timestamp) ([]*SegmentInfo, string) // getChannelOpenSegCapacityPolicy get channelSealPolicy with channel segment capacity policy func getChannelOpenSegCapacityPolicy(limit int) channelSealPolicy { - return func(channel string, segs []*SegmentInfo, ts Timestamp) []*SegmentInfo { + return func(channel string, segs []*SegmentInfo, ts Timestamp) ([]*SegmentInfo, string) { if len(segs) <= limit { - return []*SegmentInfo{} + return []*SegmentInfo{}, "" } sortSegmentsByLastExpires(segs) offLen := len(segs) - limit if offLen > len(segs) { offLen = len(segs) } - return segs[0:offLen] + return segs[0:offLen], fmt.Sprintf("seal by channel segment capacity, len(segs)=%d, limit=%d", len(segs), limit) + } +} + +// sealByTotalGrowingSegmentsSize seals the largest growing segment +// if the total size of growing segments exceeds the threshold. +func sealByTotalGrowingSegmentsSize() channelSealPolicy { + return func(channel string, segments []*SegmentInfo, ts Timestamp) ([]*SegmentInfo, string) { + growingSegments := lo.Filter(segments, func(segment *SegmentInfo, _ int) bool { + return segment != nil && segment.GetState() == commonpb.SegmentState_Growing + }) + + var totalSize int64 + sizeMap := lo.SliceToMap(growingSegments, func(segment *SegmentInfo) (int64, int64) { + size := segment.getSegmentSize() + totalSize += size + return segment.GetID(), size + }) + + threshold := paramtable.Get().DataCoordCfg.GrowingSegmentsMemSizeInMB.GetAsInt64() * 1024 * 1024 + if totalSize >= threshold { + target := lo.MaxBy(growingSegments, func(s1, s2 *SegmentInfo) bool { + return sizeMap[s1.GetID()] > sizeMap[s2.GetID()] + }) + return []*SegmentInfo{target}, fmt.Sprintf("seal by total growing segments size, "+ + "totalSize=%d, threshold=%d", totalSize, threshold) + } + return nil, "" } } diff --git a/internal/datacoord/segment_allocation_policy_test.go b/internal/datacoord/segment_allocation_policy_test.go index 4f3b7cf3d2..fc6f77ddc8 100644 --- a/internal/datacoord/segment_allocation_policy_test.go +++ b/internal/datacoord/segment_allocation_policy_test.go @@ -27,6 +27,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/tsoutil" ) @@ -132,7 +133,7 @@ func TestGetChannelOpenSegCapacityPolicy(t *testing.T) { }, } for _, c := range testCases { - result := p(c.channel, c.segments, c.ts) + result, _ := p(c.channel, c.segments, c.ts) if c.validator != nil { assert.True(t, c.validator(result)) } @@ -195,3 +196,37 @@ func Test_sealLongTimeIdlePolicy(t *testing.T) { shouldSeal, _ = policy.ShouldSeal(seg3, 100) assert.True(t, shouldSeal) } + +func Test_sealByTotalGrowingSegmentsSize(t *testing.T) { + paramtable.Get().Save(paramtable.Get().DataCoordCfg.GrowingSegmentsMemSizeInMB.Key, "100") + defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.GrowingSegmentsMemSizeInMB.Key) + + seg0 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ + ID: 0, + State: commonpb.SegmentState_Growing, + Binlogs: []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{MemorySize: 30 * MB}}}}, + }} + seg1 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ + ID: 1, + State: commonpb.SegmentState_Growing, + Binlogs: []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{MemorySize: 40 * MB}}}}, + }} + seg2 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ + ID: 2, + State: commonpb.SegmentState_Growing, + Binlogs: []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{MemorySize: 50 * MB}}}}, + }} + seg3 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ + ID: 3, + State: commonpb.SegmentState_Sealed, + }} + + fn := sealByTotalGrowingSegmentsSize() + // size not reach threshold + res, _ := fn("ch-0", []*SegmentInfo{seg0}, 0) + assert.Equal(t, 0, len(res)) + // size reached the threshold + res, _ = fn("ch-0", []*SegmentInfo{seg0, seg1, seg2, seg3}, 0) + assert.Equal(t, 1, len(res)) + assert.Equal(t, seg2.GetID(), res[0].GetID()) +} diff --git a/internal/datacoord/segment_info.go b/internal/datacoord/segment_info.go index 8dcd183632..2c609a0847 100644 --- a/internal/datacoord/segment_info.go +++ b/internal/datacoord/segment_info.go @@ -466,7 +466,7 @@ func SetLevel(level datapb.SegmentLevel) SegmentInfoOption { } func (s *SegmentInfo) getSegmentSize() int64 { - if s.size.Load() <= 0 { + if s.size.Load() <= 0 || s.GetState() == commonpb.SegmentState_Growing { var size int64 for _, binlogs := range s.GetBinlogs() { for _, l := range binlogs.GetBinlogs() { diff --git a/internal/datacoord/segment_manager.go b/internal/datacoord/segment_manager.go index 5e8aa31ba8..53fef1d16a 100644 --- a/internal/datacoord/segment_manager.go +++ b/internal/datacoord/segment_manager.go @@ -198,6 +198,12 @@ func defaultSegmentSealPolicy() []SegmentSealPolicy { } } +func defaultChannelSealPolicy() []channelSealPolicy { + return []channelSealPolicy{ + sealByTotalGrowingSegmentsSize(), + } +} + func defaultFlushPolicy() flushPolicy { return flushPolicyL1 } @@ -211,8 +217,8 @@ func newSegmentManager(meta *meta, allocator allocator, opts ...allocOption) (*S segments: make([]UniqueID, 0), estimatePolicy: defaultCalUpperLimitPolicy(), allocPolicy: defaultAllocatePolicy(), - segmentSealPolicies: defaultSegmentSealPolicy(), // default only segment size policy - channelSealPolicies: []channelSealPolicy{}, // no default channel seal policy + segmentSealPolicies: defaultSegmentSealPolicy(), + channelSealPolicies: defaultChannelSealPolicy(), flushPolicy: defaultFlushPolicy(), } for _, opt := range opts { @@ -644,6 +650,7 @@ func isEmptySealedSegment(segment *SegmentInfo, ts Timestamp) bool { // tryToSealSegment applies segment & channel seal policies func (s *SegmentManager) tryToSealSegment(ts Timestamp, channel string) error { channelInfo := make(map[string][]*SegmentInfo) + sealedSegments := make(map[int64]struct{}) for _, id := range s.segments { info := s.meta.GetHealthySegment(id) if info == nil || info.InsertChannel != channel { @@ -660,20 +667,27 @@ func (s *SegmentManager) tryToSealSegment(ts Timestamp, channel string) error { if err := s.meta.SetState(id, commonpb.SegmentState_Sealed); err != nil { return err } + sealedSegments[id] = struct{}{} break } } } for channel, segmentInfos := range channelInfo { for _, policy := range s.channelSealPolicies { - vs := policy(channel, segmentInfos, ts) + vs, reason := policy(channel, segmentInfos, ts) for _, info := range vs { + if _, ok := sealedSegments[info.GetID()]; ok { + continue + } if info.State != commonpb.SegmentState_Growing { continue } if err := s.meta.SetState(info.GetID(), commonpb.SegmentState_Sealed); err != nil { return err } + log.Info("seal segment for channel seal policy matched", + zap.Int64("segmentID", info.GetID()), zap.String("channel", channel), zap.String("reason", reason)) + sealedSegments[info.GetID()] = struct{}{} } } } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index dfb407861d..7210e696a6 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2864,6 +2864,7 @@ type dataCoordConfig struct { SegmentMaxIdleTime ParamItem `refreshable:"false"` SegmentMinSizeFromIdleToSealed ParamItem `refreshable:"false"` SegmentMaxBinlogFileNumber ParamItem `refreshable:"false"` + GrowingSegmentsMemSizeInMB ParamItem `refreshable:"true"` AutoUpgradeSegmentIndex ParamItem `refreshable:"true"` SegmentFlushInterval ParamItem `refreshable:"true"` @@ -3102,6 +3103,16 @@ the number of binlog file reaches to max value.`, } p.SegmentMaxBinlogFileNumber.Init(base.mgr) + p.GrowingSegmentsMemSizeInMB = ParamItem{ + Key: "dataCoord.sealPolicy.channel.growingSegmentsMemSize", + Version: "2.4.6", + DefaultValue: "4096", + Doc: `The size threshold in MB, if the total size of growing segments of each shard +exceeds this threshold, the largest growing segment will be sealed.`, + Export: true, + } + p.GrowingSegmentsMemSizeInMB.Init(base.mgr) + p.EnableCompaction = ParamItem{ Key: "dataCoord.enableCompaction", Version: "2.0.0", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index f495332ba7..c61f68245f 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -434,6 +434,7 @@ func TestComponentParam(t *testing.T) { assert.True(t, Params.EnableGarbageCollection.GetAsBool()) assert.Equal(t, Params.EnableActiveStandby.GetAsBool(), false) t.Logf("dataCoord EnableActiveStandby = %t", Params.EnableActiveStandby.GetAsBool()) + assert.Equal(t, int64(4096), Params.GrowingSegmentsMemSizeInMB.GetAsInt64()) assert.Equal(t, true, Params.AutoBalance.GetAsBool()) assert.Equal(t, 10, Params.CheckAutoBalanceConfigInterval.GetAsInt()) diff --git a/tests/integration/sealpolicies/seal_by_total_growing_test.go b/tests/integration/sealpolicies/seal_by_total_growing_test.go new file mode 100644 index 0000000000..b049f87a86 --- /dev/null +++ b/tests/integration/sealpolicies/seal_by_total_growing_test.go @@ -0,0 +1,138 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sealpolicies + +import ( + "context" + "time" + + "github.com/golang/protobuf/proto" + "github.com/samber/lo" + "go.uber.org/zap" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/tests/integration" +) + +func (s *SealSuite) TestSealByTotalGrowingSegmentsSize() { + paramtable.Get().Save(paramtable.Get().DataCoordCfg.GrowingSegmentsMemSizeInMB.Key, "10") + defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.GrowingSegmentsMemSizeInMB.Key) + + paramtable.Get().Save(paramtable.Get().DataNodeCfg.SyncPeriod.Key, "5") + defer paramtable.Get().Reset(paramtable.Get().DataNodeCfg.SyncPeriod.Key) + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10) + defer cancel() + c := s.Cluster + + const ( + dim = 128 + dbName = "" + rowNum = 100000 + + vecType = schemapb.DataType_FloatVector + ) + + collectionName := "TestSealByGrowingSegmentsSize_" + funcutil.GenRandomStr() + + schema := integration.ConstructSchemaOfVecDataType(collectionName, dim, true, vecType) + schema.Fields = append(schema.Fields, &schemapb.FieldSchema{ + FieldID: 102, + Name: "pid", + DataType: schemapb.DataType_Int64, + IsPartitionKey: true, + }) + marshaledSchema, err := proto.Marshal(schema) + s.NoError(err) + + // create collection + createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ + DbName: dbName, + CollectionName: collectionName, + Schema: marshaledSchema, + ShardsNum: common.DefaultShardsNum, + ConsistencyLevel: commonpb.ConsistencyLevel_Strong, + }) + err = merr.CheckRPCCall(createCollectionStatus, err) + s.NoError(err) + log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus)) + + // show collection + showCollectionsResp, err := c.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{}) + err = merr.CheckRPCCall(showCollectionsResp, err) + s.NoError(err) + log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp)) + + // insert + fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, rowNum, dim) + partitionKeyColumn := integration.NewInt64FieldDataWithStart("pid", rowNum, 0) + hashKeys := integration.GenerateHashKeys(rowNum) + insertResult, err := c.Proxy.Insert(ctx, &milvuspb.InsertRequest{ + DbName: dbName, + CollectionName: collectionName, + FieldsData: []*schemapb.FieldData{fVecColumn, partitionKeyColumn}, + HashKeys: hashKeys, + NumRows: uint32(rowNum), + }) + err = merr.CheckRPCCall(insertResult, err) + s.NoError(err) + s.Equal(int64(rowNum), insertResult.GetInsertCnt()) + + // wait for segment seal and flush + showSegments := func() bool { + var segments []*datapb.SegmentInfo + segments, err = c.MetaWatcher.ShowSegments() + s.NoError(err) + s.NotEmpty(segments) + flushedSegments := lo.Filter(segments, func(segment *datapb.SegmentInfo, _ int) bool { + return segment.GetState() == commonpb.SegmentState_Flushed + }) + log.Info("ShowSegments result", zap.Int("len(segments)", len(segments)), + zap.Int("len(flushedSegments)", len(flushedSegments))) + return len(flushedSegments) >= 1 + } + for !showSegments() { + select { + case <-ctx.Done(): + s.Fail("waiting for segment sealed timeout") + return + case <-time.After(1 * time.Second): + } + } + + // release collection + status, err := c.Proxy.ReleaseCollection(ctx, &milvuspb.ReleaseCollectionRequest{ + CollectionName: collectionName, + }) + err = merr.CheckRPCCall(status, err) + s.NoError(err) + + // drop collection + status, err = c.Proxy.DropCollection(ctx, &milvuspb.DropCollectionRequest{ + CollectionName: collectionName, + }) + err = merr.CheckRPCCall(status, err) + s.NoError(err) +} diff --git a/tests/integration/sealpolicies/seal_policies_test.go b/tests/integration/sealpolicies/seal_policies_test.go new file mode 100644 index 0000000000..c8d5493b94 --- /dev/null +++ b/tests/integration/sealpolicies/seal_policies_test.go @@ -0,0 +1,44 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sealpolicies + +import ( + "testing" + + "github.com/stretchr/testify/suite" + + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/tests/integration" +) + +type SealSuite struct { + integration.MiniClusterSuite +} + +func (s *SealSuite) SetupSuite() { + s.MiniClusterSuite.SetupSuite() + + paramtable.Init() +} + +func (s *SealSuite) TearDownSuite() { + s.MiniClusterSuite.TearDownSuite() +} + +func TestSealPolicies(t *testing.T) { + suite.Run(t, new(SealSuite)) +}