enhance: Seal by total growing segments size (#34692) (#34779)

Seals the largest growing segment if the total size of growing segments
of each shard exceeds the size threshold(default 4GB). Introducing this
policy can help keep the size of growing segments within a suitable
level, alleviating the pressure on the delegator.

issue: https://github.com/milvus-io/milvus/issues/34554

pr: https://github.com/milvus-io/milvus/pull/34692

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/34859/head
yihao.dai 2024-07-19 18:25:50 +08:00 committed by GitHub
parent 8632582520
commit 07bc1b6717
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 285 additions and 9 deletions

View File

@ -450,6 +450,11 @@ dataCoord:
# During compaction, the size of segment # of rows is able to exceed segment max # of rows by (expansionRate-1) * 100%.
expansionRate: 1.25
segmentFlushInterval: 2 # the minimal interval duration(unit: Seconds) between flusing operation on same segment
sealPolicy:
channel:
# The size threshold in MB, if the total size of growing segments of each shard
# exceeds this threshold, the largest growing segment will be sealed.
growingSegmentsMemSize: 4096
autoUpgradeSegmentIndex: false # whether auto upgrade segment index to index engine's version
enableCompaction: true # Enable data segment compaction
compaction:

View File

@ -22,6 +22,7 @@ import (
"time"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
@ -169,20 +170,47 @@ func sealL1SegmentByIdleTime(idleTimeTolerance time.Duration, minSizeToSealIdleS
}
// channelSealPolicy seal policy applies to channel
type channelSealPolicy func(string, []*SegmentInfo, Timestamp) []*SegmentInfo
type channelSealPolicy func(string, []*SegmentInfo, Timestamp) ([]*SegmentInfo, string)
// getChannelOpenSegCapacityPolicy get channelSealPolicy with channel segment capacity policy
func getChannelOpenSegCapacityPolicy(limit int) channelSealPolicy {
return func(channel string, segs []*SegmentInfo, ts Timestamp) []*SegmentInfo {
return func(channel string, segs []*SegmentInfo, ts Timestamp) ([]*SegmentInfo, string) {
if len(segs) <= limit {
return []*SegmentInfo{}
return []*SegmentInfo{}, ""
}
sortSegmentsByLastExpires(segs)
offLen := len(segs) - limit
if offLen > len(segs) {
offLen = len(segs)
}
return segs[0:offLen]
return segs[0:offLen], fmt.Sprintf("seal by channel segment capacity, len(segs)=%d, limit=%d", len(segs), limit)
}
}
// sealByTotalGrowingSegmentsSize seals the largest growing segment
// if the total size of growing segments exceeds the threshold.
func sealByTotalGrowingSegmentsSize() channelSealPolicy {
return func(channel string, segments []*SegmentInfo, ts Timestamp) ([]*SegmentInfo, string) {
growingSegments := lo.Filter(segments, func(segment *SegmentInfo, _ int) bool {
return segment != nil && segment.GetState() == commonpb.SegmentState_Growing
})
var totalSize int64
sizeMap := lo.SliceToMap(growingSegments, func(segment *SegmentInfo) (int64, int64) {
size := segment.getSegmentSize()
totalSize += size
return segment.GetID(), size
})
threshold := paramtable.Get().DataCoordCfg.GrowingSegmentsMemSizeInMB.GetAsInt64() * 1024 * 1024
if totalSize >= threshold {
target := lo.MaxBy(growingSegments, func(s1, s2 *SegmentInfo) bool {
return sizeMap[s1.GetID()] > sizeMap[s2.GetID()]
})
return []*SegmentInfo{target}, fmt.Sprintf("seal by total growing segments size, "+
"totalSize=%d, threshold=%d", totalSize, threshold)
}
return nil, ""
}
}

View File

@ -27,6 +27,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)
@ -132,7 +133,7 @@ func TestGetChannelOpenSegCapacityPolicy(t *testing.T) {
},
}
for _, c := range testCases {
result := p(c.channel, c.segments, c.ts)
result, _ := p(c.channel, c.segments, c.ts)
if c.validator != nil {
assert.True(t, c.validator(result))
}
@ -195,3 +196,37 @@ func Test_sealLongTimeIdlePolicy(t *testing.T) {
shouldSeal, _ = policy.ShouldSeal(seg3, 100)
assert.True(t, shouldSeal)
}
func Test_sealByTotalGrowingSegmentsSize(t *testing.T) {
paramtable.Get().Save(paramtable.Get().DataCoordCfg.GrowingSegmentsMemSizeInMB.Key, "100")
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.GrowingSegmentsMemSizeInMB.Key)
seg0 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{
ID: 0,
State: commonpb.SegmentState_Growing,
Binlogs: []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{MemorySize: 30 * MB}}}},
}}
seg1 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{
ID: 1,
State: commonpb.SegmentState_Growing,
Binlogs: []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{MemorySize: 40 * MB}}}},
}}
seg2 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{
ID: 2,
State: commonpb.SegmentState_Growing,
Binlogs: []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{MemorySize: 50 * MB}}}},
}}
seg3 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{
ID: 3,
State: commonpb.SegmentState_Sealed,
}}
fn := sealByTotalGrowingSegmentsSize()
// size not reach threshold
res, _ := fn("ch-0", []*SegmentInfo{seg0}, 0)
assert.Equal(t, 0, len(res))
// size reached the threshold
res, _ = fn("ch-0", []*SegmentInfo{seg0, seg1, seg2, seg3}, 0)
assert.Equal(t, 1, len(res))
assert.Equal(t, seg2.GetID(), res[0].GetID())
}

View File

@ -466,7 +466,7 @@ func SetLevel(level datapb.SegmentLevel) SegmentInfoOption {
}
func (s *SegmentInfo) getSegmentSize() int64 {
if s.size.Load() <= 0 {
if s.size.Load() <= 0 || s.GetState() == commonpb.SegmentState_Growing {
var size int64
for _, binlogs := range s.GetBinlogs() {
for _, l := range binlogs.GetBinlogs() {

View File

@ -198,6 +198,12 @@ func defaultSegmentSealPolicy() []SegmentSealPolicy {
}
}
func defaultChannelSealPolicy() []channelSealPolicy {
return []channelSealPolicy{
sealByTotalGrowingSegmentsSize(),
}
}
func defaultFlushPolicy() flushPolicy {
return flushPolicyL1
}
@ -211,8 +217,8 @@ func newSegmentManager(meta *meta, allocator allocator, opts ...allocOption) (*S
segments: make([]UniqueID, 0),
estimatePolicy: defaultCalUpperLimitPolicy(),
allocPolicy: defaultAllocatePolicy(),
segmentSealPolicies: defaultSegmentSealPolicy(), // default only segment size policy
channelSealPolicies: []channelSealPolicy{}, // no default channel seal policy
segmentSealPolicies: defaultSegmentSealPolicy(),
channelSealPolicies: defaultChannelSealPolicy(),
flushPolicy: defaultFlushPolicy(),
}
for _, opt := range opts {
@ -644,6 +650,7 @@ func isEmptySealedSegment(segment *SegmentInfo, ts Timestamp) bool {
// tryToSealSegment applies segment & channel seal policies
func (s *SegmentManager) tryToSealSegment(ts Timestamp, channel string) error {
channelInfo := make(map[string][]*SegmentInfo)
sealedSegments := make(map[int64]struct{})
for _, id := range s.segments {
info := s.meta.GetHealthySegment(id)
if info == nil || info.InsertChannel != channel {
@ -660,20 +667,27 @@ func (s *SegmentManager) tryToSealSegment(ts Timestamp, channel string) error {
if err := s.meta.SetState(id, commonpb.SegmentState_Sealed); err != nil {
return err
}
sealedSegments[id] = struct{}{}
break
}
}
}
for channel, segmentInfos := range channelInfo {
for _, policy := range s.channelSealPolicies {
vs := policy(channel, segmentInfos, ts)
vs, reason := policy(channel, segmentInfos, ts)
for _, info := range vs {
if _, ok := sealedSegments[info.GetID()]; ok {
continue
}
if info.State != commonpb.SegmentState_Growing {
continue
}
if err := s.meta.SetState(info.GetID(), commonpb.SegmentState_Sealed); err != nil {
return err
}
log.Info("seal segment for channel seal policy matched",
zap.Int64("segmentID", info.GetID()), zap.String("channel", channel), zap.String("reason", reason))
sealedSegments[info.GetID()] = struct{}{}
}
}
}

View File

@ -2864,6 +2864,7 @@ type dataCoordConfig struct {
SegmentMaxIdleTime ParamItem `refreshable:"false"`
SegmentMinSizeFromIdleToSealed ParamItem `refreshable:"false"`
SegmentMaxBinlogFileNumber ParamItem `refreshable:"false"`
GrowingSegmentsMemSizeInMB ParamItem `refreshable:"true"`
AutoUpgradeSegmentIndex ParamItem `refreshable:"true"`
SegmentFlushInterval ParamItem `refreshable:"true"`
@ -3102,6 +3103,16 @@ the number of binlog file reaches to max value.`,
}
p.SegmentMaxBinlogFileNumber.Init(base.mgr)
p.GrowingSegmentsMemSizeInMB = ParamItem{
Key: "dataCoord.sealPolicy.channel.growingSegmentsMemSize",
Version: "2.4.6",
DefaultValue: "4096",
Doc: `The size threshold in MB, if the total size of growing segments of each shard
exceeds this threshold, the largest growing segment will be sealed.`,
Export: true,
}
p.GrowingSegmentsMemSizeInMB.Init(base.mgr)
p.EnableCompaction = ParamItem{
Key: "dataCoord.enableCompaction",
Version: "2.0.0",

View File

@ -434,6 +434,7 @@ func TestComponentParam(t *testing.T) {
assert.True(t, Params.EnableGarbageCollection.GetAsBool())
assert.Equal(t, Params.EnableActiveStandby.GetAsBool(), false)
t.Logf("dataCoord EnableActiveStandby = %t", Params.EnableActiveStandby.GetAsBool())
assert.Equal(t, int64(4096), Params.GrowingSegmentsMemSizeInMB.GetAsInt64())
assert.Equal(t, true, Params.AutoBalance.GetAsBool())
assert.Equal(t, 10, Params.CheckAutoBalanceConfigInterval.GetAsInt())

View File

@ -0,0 +1,138 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sealpolicies
import (
"context"
"time"
"github.com/golang/protobuf/proto"
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/tests/integration"
)
func (s *SealSuite) TestSealByTotalGrowingSegmentsSize() {
paramtable.Get().Save(paramtable.Get().DataCoordCfg.GrowingSegmentsMemSizeInMB.Key, "10")
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.GrowingSegmentsMemSizeInMB.Key)
paramtable.Get().Save(paramtable.Get().DataNodeCfg.SyncPeriod.Key, "5")
defer paramtable.Get().Reset(paramtable.Get().DataNodeCfg.SyncPeriod.Key)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10)
defer cancel()
c := s.Cluster
const (
dim = 128
dbName = ""
rowNum = 100000
vecType = schemapb.DataType_FloatVector
)
collectionName := "TestSealByGrowingSegmentsSize_" + funcutil.GenRandomStr()
schema := integration.ConstructSchemaOfVecDataType(collectionName, dim, true, vecType)
schema.Fields = append(schema.Fields, &schemapb.FieldSchema{
FieldID: 102,
Name: "pid",
DataType: schemapb.DataType_Int64,
IsPartitionKey: true,
})
marshaledSchema, err := proto.Marshal(schema)
s.NoError(err)
// create collection
createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
DbName: dbName,
CollectionName: collectionName,
Schema: marshaledSchema,
ShardsNum: common.DefaultShardsNum,
ConsistencyLevel: commonpb.ConsistencyLevel_Strong,
})
err = merr.CheckRPCCall(createCollectionStatus, err)
s.NoError(err)
log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus))
// show collection
showCollectionsResp, err := c.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{})
err = merr.CheckRPCCall(showCollectionsResp, err)
s.NoError(err)
log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp))
// insert
fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, rowNum, dim)
partitionKeyColumn := integration.NewInt64FieldDataWithStart("pid", rowNum, 0)
hashKeys := integration.GenerateHashKeys(rowNum)
insertResult, err := c.Proxy.Insert(ctx, &milvuspb.InsertRequest{
DbName: dbName,
CollectionName: collectionName,
FieldsData: []*schemapb.FieldData{fVecColumn, partitionKeyColumn},
HashKeys: hashKeys,
NumRows: uint32(rowNum),
})
err = merr.CheckRPCCall(insertResult, err)
s.NoError(err)
s.Equal(int64(rowNum), insertResult.GetInsertCnt())
// wait for segment seal and flush
showSegments := func() bool {
var segments []*datapb.SegmentInfo
segments, err = c.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
flushedSegments := lo.Filter(segments, func(segment *datapb.SegmentInfo, _ int) bool {
return segment.GetState() == commonpb.SegmentState_Flushed
})
log.Info("ShowSegments result", zap.Int("len(segments)", len(segments)),
zap.Int("len(flushedSegments)", len(flushedSegments)))
return len(flushedSegments) >= 1
}
for !showSegments() {
select {
case <-ctx.Done():
s.Fail("waiting for segment sealed timeout")
return
case <-time.After(1 * time.Second):
}
}
// release collection
status, err := c.Proxy.ReleaseCollection(ctx, &milvuspb.ReleaseCollectionRequest{
CollectionName: collectionName,
})
err = merr.CheckRPCCall(status, err)
s.NoError(err)
// drop collection
status, err = c.Proxy.DropCollection(ctx, &milvuspb.DropCollectionRequest{
CollectionName: collectionName,
})
err = merr.CheckRPCCall(status, err)
s.NoError(err)
}

View File

@ -0,0 +1,44 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sealpolicies
import (
"testing"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/tests/integration"
)
type SealSuite struct {
integration.MiniClusterSuite
}
func (s *SealSuite) SetupSuite() {
s.MiniClusterSuite.SetupSuite()
paramtable.Init()
}
func (s *SealSuite) TearDownSuite() {
s.MiniClusterSuite.TearDownSuite()
}
func TestSealPolicies(t *testing.T) {
suite.Run(t, new(SealSuite))
}