enhance: Add delete buffer related quota logic (#35918)

See also #35303

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/35993/head
congqixia 2024-09-05 11:39:03 +08:00 committed by GitHub
parent 80a9efd96d
commit 8593c4580a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 344 additions and 32 deletions

View File

@ -979,6 +979,14 @@ quotaAndLimits:
enabled: false # switch to enable l0 segment row count quota
lowWaterLevel: 32768 # l0 segment row count quota, low water level
highWaterLevel: 65536 # l0 segment row count quota, low water level
deleteBufferRowCountProtection:
enabled: false # switch to enable delete buffer row count quota
lowWaterLevel: 32768 # delete buffer row count quota, low water level
highWaterLevel: 65536 # delete buffer row count quota, high water level
deleteBufferSizeProtection:
enabled: false # switch to enable delete buffer size quota
lowWaterLevel: 134217728 # delete buffer size quota, low water level
highWaterLevel: 268435456 # delete buffer size quota, high water level
limitReading:
# forceDeny false means dql requests are allowed (except for some
# specific conditions, such as collection has been dropped), true means always reject all dql requests.

View File

@ -80,6 +80,7 @@ type ShardDelegator interface {
ReleaseSegments(ctx context.Context, req *querypb.ReleaseSegmentsRequest, force bool) error
SyncTargetVersion(newVersion int64, growingInTarget []int64, sealedInTarget []int64, droppedInTarget []int64, checkpoint *msgpb.MsgPosition)
GetTargetVersion() int64
GetDeleteBufferSize() (entryNum int64, memorySize int64)
// manage exclude segments
AddExcludedSegments(excludeInfo map[int64]uint64)
@ -588,6 +589,10 @@ func (sd *shardDelegator) GetStatistics(ctx context.Context, req *querypb.GetSta
return results, nil
}
func (sd *shardDelegator) GetDeleteBufferSize() (entryNum int64, memorySize int64) {
return sd.deleteBuffer.Size()
}
type subTask[T any] struct {
req T
targetID int64

View File

@ -28,6 +28,7 @@ var errBufferFull = errors.New("buffer full")
type timed interface {
Timestamp() uint64
Size() int64
EntryNum() int64
}
// DeleteBuffer is the interface for delete buffer.
@ -36,6 +37,8 @@ type DeleteBuffer[T timed] interface {
ListAfter(uint64) []T
SafeTs() uint64
TryDiscard(uint64)
// Size returns current size information of delete buffer: entryNum and memory
Size() (entryNum, memorySize int64)
}
func NewDoubleCacheDeleteBuffer[T timed](startTs uint64, maxSize int64) DeleteBuffer[T] {
@ -86,31 +89,59 @@ func (c *doubleCacheBuffer[T]) ListAfter(ts uint64) []T {
return result
}
func (c *doubleCacheBuffer[T]) Size() (entryNum int64, memorySize int64) {
c.mut.RLock()
defer c.mut.RUnlock()
if c.head != nil {
blockNum, blockSize := c.head.Size()
entryNum += blockNum
memorySize += blockSize
}
if c.tail != nil {
blockNum, blockSize := c.tail.Size()
entryNum += blockNum
memorySize += blockSize
}
return entryNum, memorySize
}
// evict sets head as tail and evicts tail.
func (c *doubleCacheBuffer[T]) evict(newTs uint64, entry T) {
c.tail = c.head
c.head = &cacheBlock[T]{
headTs: newTs,
maxSize: c.maxSize / 2,
size: entry.Size(),
data: []T{entry},
headTs: newTs,
maxSize: c.maxSize / 2,
size: entry.Size(),
entryNum: entry.EntryNum(),
data: []T{entry},
}
c.ts = c.tail.headTs
}
func newCacheBlock[T timed](ts uint64, maxSize int64, elements ...T) *cacheBlock[T] {
var entryNum, memorySize int64
for _, element := range elements {
entryNum += element.EntryNum()
memorySize += element.Size()
}
return &cacheBlock[T]{
headTs: ts,
maxSize: maxSize,
data: elements,
headTs: ts,
maxSize: maxSize,
data: elements,
entryNum: entryNum,
size: memorySize,
}
}
type cacheBlock[T timed] struct {
mut sync.RWMutex
headTs uint64
size int64
maxSize int64
mut sync.RWMutex
headTs uint64
entryNum int64
size int64
maxSize int64
data []T
}
@ -127,6 +158,7 @@ func (c *cacheBlock[T]) Put(entry T) error {
c.data = append(c.data, entry)
c.size += entry.Size()
c.entryNum += entry.EntryNum()
return nil
}
@ -143,3 +175,7 @@ func (c *cacheBlock[T]) ListAfter(ts uint64) []T {
}
return c.data[idx:]
}
func (c *cacheBlock[T]) Size() (entryNum, memorySize int64) {
return c.entryNum, c.size
}

View File

@ -110,6 +110,9 @@ func (s *DoubleCacheBufferSuite) TestPut() {
s.Equal(2, len(buffer.ListAfter(11)))
s.Equal(1, len(buffer.ListAfter(12)))
entryNum, memorySize := buffer.Size()
s.EqualValues(2, entryNum)
s.EqualValues(304, memorySize)
buffer.Put(&Item{
Ts: 13,
@ -128,6 +131,9 @@ func (s *DoubleCacheBufferSuite) TestPut() {
s.Equal(2, len(buffer.ListAfter(11)))
s.Equal(2, len(buffer.ListAfter(12)))
s.Equal(1, len(buffer.ListAfter(13)))
entryNum, memorySize = buffer.Size()
s.EqualValues(2, entryNum)
s.EqualValues(304, memorySize)
}
func TestDoubleCacheDeleteBuffer(t *testing.T) {

View File

@ -24,6 +24,12 @@ func (item *Item) Size() int64 {
}, int64(0))
}
func (item *Item) EntryNum() int64 {
return lo.Reduce(item.Data, func(entryNum int64, item BufferItem, _ int) int64 {
return entryNum + item.EntryNum()
}, int64(0))
}
type BufferItem struct {
PartitionID int64
DeleteData storage.DeleteData
@ -37,3 +43,7 @@ func (item *BufferItem) Size() int64 {
return int64(96) + pkSize + int64(8*len(item.DeleteData.Tss))
}
func (item *BufferItem) EntryNum() int64 {
return int64(len(item.DeleteData.Pks))
}

View File

@ -15,9 +15,12 @@ func TestDeleteBufferItem(t *testing.T) {
}
assert.Equal(t, int64(96), item.Size())
assert.EqualValues(t, 0, item.EntryNum())
item.DeleteData.Pks = []storage.PrimaryKey{
storage.NewInt64PrimaryKey(10),
}
item.DeleteData.Tss = []uint64{2000}
assert.Equal(t, int64(120), item.Size())
assert.EqualValues(t, 1, item.EntryNum())
}

View File

@ -92,3 +92,15 @@ func (b *listDeleteBuffer[T]) TryDiscard(ts uint64) {
b.list = b.list[nextHead:]
}
}
func (b *listDeleteBuffer[T]) Size() (entryNum, memorySize int64) {
b.mut.RLock()
defer b.mut.RUnlock()
for _, block := range b.list {
blockNum, blockSize := block.Size()
entryNum += blockNum
memorySize += blockSize
}
return entryNum, memorySize
}

View File

@ -62,6 +62,9 @@ func (s *ListDeleteBufferSuite) TestCache() {
s.Equal(2, len(buffer.ListAfter(11)))
s.Equal(1, len(buffer.ListAfter(12)))
entryNum, memorySize := buffer.Size()
s.EqualValues(0, entryNum)
s.EqualValues(192, memorySize)
}
func (s *ListDeleteBufferSuite) TestTryDiscard() {
@ -95,18 +98,32 @@ func (s *ListDeleteBufferSuite) TestTryDiscard() {
})
s.Equal(2, len(buffer.ListAfter(10)))
entryNum, memorySize := buffer.Size()
s.EqualValues(2, entryNum)
s.EqualValues(240, memorySize)
buffer.TryDiscard(10)
s.Equal(2, len(buffer.ListAfter(10)), "equal ts shall not discard block")
entryNum, memorySize = buffer.Size()
s.EqualValues(2, entryNum)
s.EqualValues(240, memorySize)
buffer.TryDiscard(9)
s.Equal(2, len(buffer.ListAfter(10)), "history ts shall not discard any block")
entryNum, memorySize = buffer.Size()
s.EqualValues(2, entryNum)
s.EqualValues(240, memorySize)
buffer.TryDiscard(20)
s.Equal(1, len(buffer.ListAfter(10)), "first block shall be discarded")
entryNum, memorySize = buffer.Size()
s.EqualValues(1, entryNum)
s.EqualValues(120, memorySize)
buffer.TryDiscard(20)
s.Equal(1, len(buffer.ListAfter(10)), "discard will not happen if there is only one block")
s.EqualValues(1, entryNum)
s.EqualValues(120, memorySize)
}
func TestListDeleteBuffer(t *testing.T) {

View File

@ -134,6 +134,57 @@ func (_c *MockShardDelegator_Collection_Call) RunAndReturn(run func() int64) *Mo
return _c
}
// GetDeleteBufferSize provides a mock function with given fields:
func (_m *MockShardDelegator) GetDeleteBufferSize() (int64, int64) {
ret := _m.Called()
var r0 int64
var r1 int64
if rf, ok := ret.Get(0).(func() (int64, int64)); ok {
return rf()
}
if rf, ok := ret.Get(0).(func() int64); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(int64)
}
if rf, ok := ret.Get(1).(func() int64); ok {
r1 = rf()
} else {
r1 = ret.Get(1).(int64)
}
return r0, r1
}
// MockShardDelegator_GetDeleteBufferSize_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetDeleteBufferSize'
type MockShardDelegator_GetDeleteBufferSize_Call struct {
*mock.Call
}
// GetDeleteBufferSize is a helper method to define mock.On call
func (_e *MockShardDelegator_Expecter) GetDeleteBufferSize() *MockShardDelegator_GetDeleteBufferSize_Call {
return &MockShardDelegator_GetDeleteBufferSize_Call{Call: _e.mock.On("GetDeleteBufferSize")}
}
func (_c *MockShardDelegator_GetDeleteBufferSize_Call) Run(run func()) *MockShardDelegator_GetDeleteBufferSize_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockShardDelegator_GetDeleteBufferSize_Call) Return(entryNum int64, memorySize int64) *MockShardDelegator_GetDeleteBufferSize_Call {
_c.Call.Return(entryNum, memorySize)
return _c
}
func (_c *MockShardDelegator_GetDeleteBufferSize_Call) RunAndReturn(run func() (int64, int64)) *MockShardDelegator_GetDeleteBufferSize_Call {
_c.Call.Return(run)
return _c
}
// GetPartitionStatsVersions provides a mock function with given fields: ctx
func (_m *MockShardDelegator) GetPartitionStatsVersions(ctx context.Context) map[int64]int64 {
ret := _m.Called(ctx)

View File

@ -24,6 +24,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/querynodev2/collector"
"github.com/milvus-io/milvus/internal/querynodev2/delegator"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/hardware"
@ -125,6 +126,17 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error
).Set(float64(numEntities))
}
deleteBufferNum := make(map[int64]int64)
deleteBufferSize := make(map[int64]int64)
node.delegators.Range(func(_ string, sd delegator.ShardDelegator) bool {
collectionID := sd.Collection()
entryNum, memorySize := sd.GetDeleteBufferSize()
deleteBufferNum[collectionID] += entryNum
deleteBufferSize[collectionID] += memorySize
return true
})
return &metricsinfo.QueryNodeQuotaMetrics{
Hms: metricsinfo.HardwareMetrics{},
Rms: rms,
@ -138,6 +150,10 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error
NodeID: node.GetNodeID(),
CollectionIDs: collections,
},
DeleteBufferInfo: metricsinfo.DeleteBufferInfo{
CollectionDeleteBufferNum: deleteBufferNum,
CollectionDeleteBufferSize: deleteBufferSize,
},
}, nil
}

View File

@ -1749,9 +1749,32 @@ func (suite *ServiceSuite) TestGetMetric_Normal() {
Request: string(mReq),
}
sd1 := delegator.NewMockShardDelegator(suite.T())
sd1.EXPECT().Collection().Return(100)
sd1.EXPECT().GetDeleteBufferSize().Return(10, 1000)
sd1.EXPECT().Close().Maybe()
suite.node.delegators.Insert("qn_unitest_dml_0_100v0", sd1)
sd2 := delegator.NewMockShardDelegator(suite.T())
sd2.EXPECT().Collection().Return(100)
sd2.EXPECT().GetDeleteBufferSize().Return(10, 1000)
sd2.EXPECT().Close().Maybe()
suite.node.delegators.Insert("qn_unitest_dml_1_100v1", sd2)
resp, err := suite.node.GetMetrics(ctx, req)
err = merr.CheckRPCCall(resp, err)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
info := &metricsinfo.QueryNodeInfos{}
err = metricsinfo.UnmarshalComponentInfos(resp.GetResponse(), info)
suite.NoError(err)
entryNum, ok := info.QuotaMetrics.DeleteBufferInfo.CollectionDeleteBufferNum[100]
suite.True(ok)
suite.EqualValues(20, entryNum)
memorySize, ok := info.QuotaMetrics.DeleteBufferInfo.CollectionDeleteBufferSize[100]
suite.True(ok)
suite.EqualValues(2000, memorySize)
}
func (suite *ServiceSuite) TestGetMetric_Failed() {

View File

@ -757,6 +757,10 @@ func (q *QuotaCenter) calculateWriteRates() error {
updateCollectionFactor(growingSegFactors)
l0Factors := q.getL0SegmentsSizeFactor()
updateCollectionFactor(l0Factors)
deleteBufferRowCountFactors := q.getDeleteBufferRowCountFactor()
updateCollectionFactor(deleteBufferRowCountFactors)
deleteBufferSizeFactors := q.getDeleteBufferSizeFactor()
updateCollectionFactor(deleteBufferSizeFactors)
ttCollections := make([]int64, 0)
memoryCollections := make([]int64, 0)
@ -1034,6 +1038,61 @@ func (q *QuotaCenter) getL0SegmentsSizeFactor() map[int64]float64 {
return collectionFactor
}
func (q *QuotaCenter) getDeleteBufferRowCountFactor() map[int64]float64 {
if !Params.QuotaConfig.DeleteBufferRowCountProtectionEnabled.GetAsBool() {
return nil
}
deleteBufferRowCountLowWaterLevel := Params.QuotaConfig.DeleteBufferRowCountLowWaterLevel.GetAsInt64()
deleteBufferRowCountHighWaterLevel := Params.QuotaConfig.DeleteBufferRowCountHighWaterLevel.GetAsInt64()
deleteBufferNum := make(map[int64]int64)
for _, queryNodeMetrics := range q.queryNodeMetrics {
for collectionID, num := range queryNodeMetrics.DeleteBufferInfo.CollectionDeleteBufferNum {
deleteBufferNum[collectionID] += num
}
for collectionID, size := range queryNodeMetrics.DeleteBufferInfo.CollectionDeleteBufferSize {
deleteBufferNum[collectionID] += size
}
}
collectionFactor := make(map[int64]float64)
for collID, rowCount := range map[int64]int64{100: 1000} {
if rowCount < deleteBufferRowCountLowWaterLevel {
continue
}
factor := float64(deleteBufferRowCountHighWaterLevel-rowCount) / float64(deleteBufferRowCountHighWaterLevel-deleteBufferRowCountLowWaterLevel)
collectionFactor[collID] = factor
}
return collectionFactor
}
func (q *QuotaCenter) getDeleteBufferSizeFactor() map[int64]float64 {
if !Params.QuotaConfig.DeleteBufferSizeProtectionEnabled.GetAsBool() {
return nil
}
deleteBufferRowCountLowWaterLevel := Params.QuotaConfig.DeleteBufferSizeLowWaterLevel.GetAsInt64()
deleteBufferRowCountHighWaterLevel := Params.QuotaConfig.DeleteBufferSizeHighWaterLevel.GetAsInt64()
deleteBufferSize := make(map[int64]int64)
for _, queryNodeMetrics := range q.queryNodeMetrics {
for collectionID, size := range queryNodeMetrics.DeleteBufferInfo.CollectionDeleteBufferSize {
deleteBufferSize[collectionID] += size
}
}
collectionFactor := make(map[int64]float64)
for collID, rowCount := range map[int64]int64{100: 1000} {
if rowCount < deleteBufferRowCountLowWaterLevel {
continue
}
factor := float64(deleteBufferRowCountHighWaterLevel-rowCount) / float64(deleteBufferRowCountHighWaterLevel-deleteBufferRowCountLowWaterLevel)
collectionFactor[collID] = factor
}
return collectionFactor
}
// calculateRates calculates target rates by different strategies.
func (q *QuotaCenter) calculateRates() error {
err := q.resetAllCurrentRates()

View File

@ -62,6 +62,12 @@ type QueryNodeQuotaMetrics struct {
Fgm FlowGraphMetric
GrowingSegmentsSize int64
Effect NodeEffect
DeleteBufferInfo DeleteBufferInfo
}
type DeleteBufferInfo struct {
CollectionDeleteBufferNum map[int64]int64
CollectionDeleteBufferSize map[int64]int64
}
type DataCoordQuotaMetrics struct {

View File

@ -134,26 +134,32 @@ type quotaConfig struct {
MaxResourceGroupNumOfQueryNode ParamItem `refreshable:"true"`
// limit writing
ForceDenyWriting ParamItem `refreshable:"true"`
TtProtectionEnabled ParamItem `refreshable:"true"`
MaxTimeTickDelay ParamItem `refreshable:"true"`
MemProtectionEnabled ParamItem `refreshable:"true"`
DataNodeMemoryLowWaterLevel ParamItem `refreshable:"true"`
DataNodeMemoryHighWaterLevel ParamItem `refreshable:"true"`
QueryNodeMemoryLowWaterLevel ParamItem `refreshable:"true"`
QueryNodeMemoryHighWaterLevel ParamItem `refreshable:"true"`
GrowingSegmentsSizeProtectionEnabled ParamItem `refreshable:"true"`
GrowingSegmentsSizeMinRateRatio ParamItem `refreshable:"true"`
GrowingSegmentsSizeLowWaterLevel ParamItem `refreshable:"true"`
GrowingSegmentsSizeHighWaterLevel ParamItem `refreshable:"true"`
DiskProtectionEnabled ParamItem `refreshable:"true"`
DiskQuota ParamItem `refreshable:"true"`
DiskQuotaPerDB ParamItem `refreshable:"true"`
DiskQuotaPerCollection ParamItem `refreshable:"true"`
DiskQuotaPerPartition ParamItem `refreshable:"true"`
L0SegmentRowCountProtectionEnabled ParamItem `refreshable:"true"`
L0SegmentRowCountLowWaterLevel ParamItem `refreshable:"true"`
L0SegmentRowCountHighWaterLevel ParamItem `refreshable:"true"`
ForceDenyWriting ParamItem `refreshable:"true"`
TtProtectionEnabled ParamItem `refreshable:"true"`
MaxTimeTickDelay ParamItem `refreshable:"true"`
MemProtectionEnabled ParamItem `refreshable:"true"`
DataNodeMemoryLowWaterLevel ParamItem `refreshable:"true"`
DataNodeMemoryHighWaterLevel ParamItem `refreshable:"true"`
QueryNodeMemoryLowWaterLevel ParamItem `refreshable:"true"`
QueryNodeMemoryHighWaterLevel ParamItem `refreshable:"true"`
GrowingSegmentsSizeProtectionEnabled ParamItem `refreshable:"true"`
GrowingSegmentsSizeMinRateRatio ParamItem `refreshable:"true"`
GrowingSegmentsSizeLowWaterLevel ParamItem `refreshable:"true"`
GrowingSegmentsSizeHighWaterLevel ParamItem `refreshable:"true"`
DiskProtectionEnabled ParamItem `refreshable:"true"`
DiskQuota ParamItem `refreshable:"true"`
DiskQuotaPerDB ParamItem `refreshable:"true"`
DiskQuotaPerCollection ParamItem `refreshable:"true"`
DiskQuotaPerPartition ParamItem `refreshable:"true"`
L0SegmentRowCountProtectionEnabled ParamItem `refreshable:"true"`
L0SegmentRowCountLowWaterLevel ParamItem `refreshable:"true"`
L0SegmentRowCountHighWaterLevel ParamItem `refreshable:"true"`
DeleteBufferRowCountProtectionEnabled ParamItem `refreshable:"true"`
DeleteBufferRowCountLowWaterLevel ParamItem `refreshable:"true"`
DeleteBufferRowCountHighWaterLevel ParamItem `refreshable:"true"`
DeleteBufferSizeProtectionEnabled ParamItem `refreshable:"true"`
DeleteBufferSizeLowWaterLevel ParamItem `refreshable:"true"`
DeleteBufferSizeHighWaterLevel ParamItem `refreshable:"true"`
// limit reading
ForceDenyReading ParamItem `refreshable:"true"`
@ -1906,6 +1912,60 @@ but the rate will not be lower than minRateRatio * dmlRate.`,
}
p.L0SegmentRowCountHighWaterLevel.Init(base.mgr)
p.DeleteBufferRowCountProtectionEnabled = ParamItem{
Key: "quotaAndLimits.limitWriting.deleteBufferRowCountProtection.enabled",
Version: "2.4.11",
DefaultValue: "false",
Doc: "switch to enable delete buffer row count quota",
Export: true,
}
p.DeleteBufferRowCountProtectionEnabled.Init(base.mgr)
p.DeleteBufferRowCountLowWaterLevel = ParamItem{
Key: "quotaAndLimits.limitWriting.deleteBufferRowCountProtection.lowWaterLevel",
Version: "2.4.11",
DefaultValue: "32768",
Doc: "delete buffer row count quota, low water level",
Export: true,
}
p.DeleteBufferRowCountLowWaterLevel.Init(base.mgr)
p.DeleteBufferRowCountHighWaterLevel = ParamItem{
Key: "quotaAndLimits.limitWriting.deleteBufferRowCountProtection.highWaterLevel",
Version: "2.4.11",
DefaultValue: "65536",
Doc: "delete buffer row count quota, high water level",
Export: true,
}
p.DeleteBufferRowCountHighWaterLevel.Init(base.mgr)
p.DeleteBufferSizeProtectionEnabled = ParamItem{
Key: "quotaAndLimits.limitWriting.deleteBufferSizeProtection.enabled",
Version: "2.4.11",
DefaultValue: "false",
Doc: "switch to enable delete buffer size quota",
Export: true,
}
p.DeleteBufferSizeProtectionEnabled.Init(base.mgr)
p.DeleteBufferSizeLowWaterLevel = ParamItem{
Key: "quotaAndLimits.limitWriting.deleteBufferSizeProtection.lowWaterLevel",
Version: "2.4.11",
DefaultValue: "134217728", // 128MB
Doc: "delete buffer size quota, low water level",
Export: true,
}
p.DeleteBufferSizeLowWaterLevel.Init(base.mgr)
p.DeleteBufferSizeHighWaterLevel = ParamItem{
Key: "quotaAndLimits.limitWriting.deleteBufferSizeProtection.highWaterLevel",
Version: "2.4.11",
DefaultValue: "268435456", // 256MB
Doc: "delete buffer size quota, high water level",
Export: true,
}
p.DeleteBufferSizeHighWaterLevel.Init(base.mgr)
// limit reading
p.ForceDenyReading = ParamItem{
Key: "quotaAndLimits.limitReading.forceDeny",