enhance: Add `MemoryHighSyncPolicy` back to write buffer manager (#29997)

See also #27675

This PR adds back MemoryHighSyncPolicy implementation. Also change
MinSegmentSize & CheckInterval to configurable param item.

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/30277/head
congqixia 2024-01-31 19:03:04 +08:00 committed by GitHub
parent b5e078c4d3
commit fc0d007bd1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 579 additions and 84 deletions

View File

@ -372,6 +372,8 @@ func (node *DataNode) Start() error {
return
}
node.writeBufferManager.Start()
node.stopWaiter.Add(1)
go node.BackGroundGC(node.clearSignal)
@ -426,6 +428,10 @@ func (node *DataNode) Stop() error {
return true
})
if node.writeBufferManager != nil {
node.writeBufferManager.Stop()
}
if node.allocator != nil {
log.Info("close id allocator", zap.String("role", typeutil.DataNodeRole))
node.allocator.Close()

View File

@ -122,7 +122,7 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
log.Info("receiving FlushSegments request")
err := node.writeBufferManager.FlushSegments(ctx, req.GetChannelName(), segmentIDs)
err := node.writeBufferManager.SealSegments(ctx, req.GetChannelName(), segmentIDs)
if err != nil {
log.Warn("failed to flush segments", zap.Error(err))
return merr.Status(err), nil

View File

@ -3,6 +3,7 @@ package writebuffer
import (
"context"
"sync"
"time"
"go.uber.org/zap"
@ -11,16 +12,20 @@ import (
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/hardware"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
// BufferManager is the interface for WriteBuffer management.
type BufferManager interface {
// Register adds a WriteBuffer with provided schema & options.
Register(channel string, metacache metacache.MetaCache, storageV2Cache *metacache.StorageV2Cache, opts ...WriteBufferOption) error
// FlushSegments notifies writeBuffer corresponding to provided channel to flush segments.
FlushSegments(ctx context.Context, channel string, segmentIDs []int64) error
// FlushChannel
// SealSegments notifies writeBuffer corresponding to provided channel to seal segments.
// which will cause segment start flush procedure.
SealSegments(ctx context.Context, channel string, segmentIDs []int64) error
// FlushChannel set the flushTs of the provided write buffer.
FlushChannel(ctx context.Context, channel string, flushTs uint64) error
// RemoveChannel removes a write buffer from manager.
RemoveChannel(channel string)
@ -32,6 +37,11 @@ type BufferManager interface {
GetCheckpoint(channel string) (*msgpb.MsgPosition, bool, error)
// NotifyCheckpointUpdated notify write buffer checkpoint updated to reset flushTs.
NotifyCheckpointUpdated(channel string, ts uint64)
// Start makes the background check start to work.
Start()
// Stop the background checker and wait for worker goroutine quit.
Stop()
}
// NewManager returns initialized manager as `Manager`
@ -39,6 +49,8 @@ func NewManager(syncMgr syncmgr.SyncManager) BufferManager {
return &bufferManager{
syncMgr: syncMgr,
buffers: make(map[string]WriteBuffer),
ch: lifetime.NewSafeChan(),
}
}
@ -46,6 +58,80 @@ type bufferManager struct {
syncMgr syncmgr.SyncManager
buffers map[string]WriteBuffer
mut sync.RWMutex
wg sync.WaitGroup
ch lifetime.SafeChan
}
func (m *bufferManager) Start() {
m.wg.Add(1)
go func() {
defer m.wg.Done()
m.check()
}()
}
func (m *bufferManager) check() {
ticker := time.NewTimer(paramtable.Get().DataNodeCfg.MemoryCheckInterval.GetAsDuration(time.Millisecond))
defer ticker.Stop()
for {
select {
case <-ticker.C:
m.memoryCheck()
ticker.Reset(paramtable.Get().DataNodeCfg.MemoryCheckInterval.GetAsDuration(time.Millisecond))
case <-m.ch.CloseCh():
log.Info("buffer manager memory check stopped")
return
}
}
}
// memoryCheck performs check based on current memory usage & configuration.
func (m *bufferManager) memoryCheck() {
if !paramtable.Get().DataNodeCfg.MemoryForceSyncEnable.GetAsBool() {
return
}
m.mut.Lock()
defer m.mut.Unlock()
var total int64
var candidate WriteBuffer
var candiSize int64
var candiChan string
for chanName, buf := range m.buffers {
size := buf.MemorySize()
total += size
if size > candiSize {
candiSize = size
candidate = buf
candiChan = chanName
}
}
toMB := func(mem float64) float64 {
return mem / 1024 / 1024
}
totalMemory := hardware.GetMemoryCount()
memoryWatermark := float64(totalMemory) * paramtable.Get().DataNodeCfg.MemoryWatermark.GetAsFloat()
if float64(total) < memoryWatermark {
log.RatedDebug(20, "skip force sync because memory level is not high enough",
zap.Float64("current_total_memory_usage", toMB(float64(total))),
zap.Float64("current_memory_watermark", toMB(memoryWatermark)))
return
}
if candidate != nil {
candidate.EvictBuffer(GetOldestBufferPolicy(paramtable.Get().DataNodeCfg.MemoryForceSyncSegmentNum.GetAsInt()))
log.Info("notify writebuffer to sync",
zap.String("channel", candiChan), zap.Float64("bufferSize(MB)", toMB(float64(candiSize))))
}
}
func (m *bufferManager) Stop() {
m.ch.Close()
m.wg.Wait()
}
// Register a new WriteBuffer for channel.
@ -65,8 +151,8 @@ func (m *bufferManager) Register(channel string, metacache metacache.MetaCache,
return nil
}
// FlushSegments call sync segment and change segments state to Flushed.
func (m *bufferManager) FlushSegments(ctx context.Context, channel string, segmentIDs []int64) error {
// SealSegments call sync segment and change segments state to Flushed.
func (m *bufferManager) SealSegments(ctx context.Context, channel string, segmentIDs []int64) error {
m.mut.RLock()
buf, ok := m.buffers[channel]
m.mut.RUnlock()
@ -78,7 +164,7 @@ func (m *bufferManager) FlushSegments(ctx context.Context, channel string, segme
return merr.WrapErrChannelNotFound(channel)
}
return buf.FlushSegments(ctx, segmentIDs)
return buf.SealSegments(ctx, segmentIDs)
}
func (m *bufferManager) FlushChannel(ctx context.Context, channel string, flushTs uint64) error {

View File

@ -15,6 +15,7 @@ import (
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/hardware"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
@ -87,7 +88,7 @@ func (s *ManagerSuite) TestFlushSegments() {
s.Run("channel_not_found", func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := manager.FlushSegments(ctx, s.channelName, []int64{1, 2, 3})
err := manager.SealSegments(ctx, s.channelName, []int64{1, 2, 3})
s.Error(err, "FlushSegments shall return error when channel not found")
})
@ -101,9 +102,9 @@ func (s *ManagerSuite) TestFlushSegments() {
s.manager.buffers[s.channelName] = wb
s.manager.mut.Unlock()
wb.EXPECT().FlushSegments(mock.Anything, mock.Anything).Return(nil)
wb.EXPECT().SealSegments(mock.Anything, mock.Anything).Return(nil)
err := manager.FlushSegments(ctx, s.channelName, []int64{1})
err := manager.SealSegments(ctx, s.channelName, []int64{1})
s.NoError(err)
})
}
@ -192,6 +193,51 @@ func (s *ManagerSuite) TestRemoveChannel() {
})
}
func (s *ManagerSuite) TestMemoryCheck() {
manager := s.manager
param := paramtable.Get()
param.Save(param.DataNodeCfg.MemoryCheckInterval.Key, "50")
param.Save(param.DataNodeCfg.MemoryForceSyncEnable.Key, "false")
param.Save(param.DataNodeCfg.MemoryWatermark.Key, "0.7")
defer func() {
param.Reset(param.DataNodeCfg.MemoryCheckInterval.Key)
param.Reset(param.DataNodeCfg.MemoryForceSyncEnable.Key)
param.Reset(param.DataNodeCfg.MemoryWatermark.Key)
}()
wb := NewMockWriteBuffer(s.T())
memoryLimit := hardware.GetMemoryCount()
signal := make(chan struct{}, 1)
wb.EXPECT().MemorySize().Return(int64(float64(memoryLimit) * 0.6))
wb.EXPECT().EvictBuffer(mock.Anything).Run(func(polices ...SyncPolicy) {
select {
case signal <- struct{}{}:
default:
}
}).Return()
manager.mut.Lock()
manager.buffers[s.channelName] = wb
manager.mut.Unlock()
manager.Start()
defer manager.Stop()
<-time.After(time.Millisecond * 100)
wb.AssertNotCalled(s.T(), "MemorySize")
param.Save(param.DataNodeCfg.MemoryForceSyncEnable.Key, "true")
<-time.After(time.Millisecond * 100)
wb.AssertNotCalled(s.T(), "SetMemoryHighFlag")
param.Save(param.DataNodeCfg.MemoryWatermark.Key, "0.5")
<-signal
wb.AssertExpectations(s.T())
}
func TestManager(t *testing.T) {
suite.Run(t, new(ManagerSuite))
}

View File

@ -149,50 +149,6 @@ func (_c *MockBufferManager_FlushChannel_Call) RunAndReturn(run func(context.Con
return _c
}
// FlushSegments provides a mock function with given fields: ctx, channel, segmentIDs
func (_m *MockBufferManager) FlushSegments(ctx context.Context, channel string, segmentIDs []int64) error {
ret := _m.Called(ctx, channel, segmentIDs)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string, []int64) error); ok {
r0 = rf(ctx, channel, segmentIDs)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockBufferManager_FlushSegments_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'FlushSegments'
type MockBufferManager_FlushSegments_Call struct {
*mock.Call
}
// FlushSegments is a helper method to define mock.On call
// - ctx context.Context
// - channel string
// - segmentIDs []int64
func (_e *MockBufferManager_Expecter) FlushSegments(ctx interface{}, channel interface{}, segmentIDs interface{}) *MockBufferManager_FlushSegments_Call {
return &MockBufferManager_FlushSegments_Call{Call: _e.mock.On("FlushSegments", ctx, channel, segmentIDs)}
}
func (_c *MockBufferManager_FlushSegments_Call) Run(run func(ctx context.Context, channel string, segmentIDs []int64)) *MockBufferManager_FlushSegments_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string), args[2].([]int64))
})
return _c
}
func (_c *MockBufferManager_FlushSegments_Call) Return(_a0 error) *MockBufferManager_FlushSegments_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockBufferManager_FlushSegments_Call) RunAndReturn(run func(context.Context, string, []int64) error) *MockBufferManager_FlushSegments_Call {
_c.Call.Return(run)
return _c
}
// GetCheckpoint provides a mock function with given fields: channel
func (_m *MockBufferManager) GetCheckpoint(channel string) (*msgpb.MsgPosition, bool, error) {
ret := _m.Called(channel)
@ -380,6 +336,114 @@ func (_c *MockBufferManager_RemoveChannel_Call) RunAndReturn(run func(string)) *
return _c
}
// SealSegments provides a mock function with given fields: ctx, channel, segmentIDs
func (_m *MockBufferManager) SealSegments(ctx context.Context, channel string, segmentIDs []int64) error {
ret := _m.Called(ctx, channel, segmentIDs)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string, []int64) error); ok {
r0 = rf(ctx, channel, segmentIDs)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockBufferManager_SealSegments_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SealSegments'
type MockBufferManager_SealSegments_Call struct {
*mock.Call
}
// SealSegments is a helper method to define mock.On call
// - ctx context.Context
// - channel string
// - segmentIDs []int64
func (_e *MockBufferManager_Expecter) SealSegments(ctx interface{}, channel interface{}, segmentIDs interface{}) *MockBufferManager_SealSegments_Call {
return &MockBufferManager_SealSegments_Call{Call: _e.mock.On("SealSegments", ctx, channel, segmentIDs)}
}
func (_c *MockBufferManager_SealSegments_Call) Run(run func(ctx context.Context, channel string, segmentIDs []int64)) *MockBufferManager_SealSegments_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string), args[2].([]int64))
})
return _c
}
func (_c *MockBufferManager_SealSegments_Call) Return(_a0 error) *MockBufferManager_SealSegments_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockBufferManager_SealSegments_Call) RunAndReturn(run func(context.Context, string, []int64) error) *MockBufferManager_SealSegments_Call {
_c.Call.Return(run)
return _c
}
// Start provides a mock function with given fields:
func (_m *MockBufferManager) Start() {
_m.Called()
}
// MockBufferManager_Start_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Start'
type MockBufferManager_Start_Call struct {
*mock.Call
}
// Start is a helper method to define mock.On call
func (_e *MockBufferManager_Expecter) Start() *MockBufferManager_Start_Call {
return &MockBufferManager_Start_Call{Call: _e.mock.On("Start")}
}
func (_c *MockBufferManager_Start_Call) Run(run func()) *MockBufferManager_Start_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockBufferManager_Start_Call) Return() *MockBufferManager_Start_Call {
_c.Call.Return()
return _c
}
func (_c *MockBufferManager_Start_Call) RunAndReturn(run func()) *MockBufferManager_Start_Call {
_c.Call.Return(run)
return _c
}
// Stop provides a mock function with given fields:
func (_m *MockBufferManager) Stop() {
_m.Called()
}
// MockBufferManager_Stop_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Stop'
type MockBufferManager_Stop_Call struct {
*mock.Call
}
// Stop is a helper method to define mock.On call
func (_e *MockBufferManager_Expecter) Stop() *MockBufferManager_Stop_Call {
return &MockBufferManager_Stop_Call{Call: _e.mock.On("Stop")}
}
func (_c *MockBufferManager_Stop_Call) Run(run func()) *MockBufferManager_Stop_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockBufferManager_Stop_Call) Return() *MockBufferManager_Stop_Call {
_c.Call.Return()
return _c
}
func (_c *MockBufferManager_Stop_Call) RunAndReturn(run func()) *MockBufferManager_Stop_Call {
_c.Call.Return(run)
return _c
}
// NewMockBufferManager creates a new instance of MockBufferManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockBufferManager(t interface {

View File

@ -102,45 +102,48 @@ func (_c *MockWriteBuffer_Close_Call) RunAndReturn(run func(bool)) *MockWriteBuf
return _c
}
// FlushSegments provides a mock function with given fields: ctx, segmentIDs
func (_m *MockWriteBuffer) FlushSegments(ctx context.Context, segmentIDs []int64) error {
ret := _m.Called(ctx, segmentIDs)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, []int64) error); ok {
r0 = rf(ctx, segmentIDs)
} else {
r0 = ret.Error(0)
// EvictBuffer provides a mock function with given fields: policies
func (_m *MockWriteBuffer) EvictBuffer(policies ...SyncPolicy) {
_va := make([]interface{}, len(policies))
for _i := range policies {
_va[_i] = policies[_i]
}
return r0
var _ca []interface{}
_ca = append(_ca, _va...)
_m.Called(_ca...)
}
// MockWriteBuffer_FlushSegments_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'FlushSegments'
type MockWriteBuffer_FlushSegments_Call struct {
// MockWriteBuffer_EvictBuffer_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'EvictBuffer'
type MockWriteBuffer_EvictBuffer_Call struct {
*mock.Call
}
// FlushSegments is a helper method to define mock.On call
// - ctx context.Context
// - segmentIDs []int64
func (_e *MockWriteBuffer_Expecter) FlushSegments(ctx interface{}, segmentIDs interface{}) *MockWriteBuffer_FlushSegments_Call {
return &MockWriteBuffer_FlushSegments_Call{Call: _e.mock.On("FlushSegments", ctx, segmentIDs)}
// EvictBuffer is a helper method to define mock.On call
// - policies ...SyncPolicy
func (_e *MockWriteBuffer_Expecter) EvictBuffer(policies ...interface{}) *MockWriteBuffer_EvictBuffer_Call {
return &MockWriteBuffer_EvictBuffer_Call{Call: _e.mock.On("EvictBuffer",
append([]interface{}{}, policies...)...)}
}
func (_c *MockWriteBuffer_FlushSegments_Call) Run(run func(ctx context.Context, segmentIDs []int64)) *MockWriteBuffer_FlushSegments_Call {
func (_c *MockWriteBuffer_EvictBuffer_Call) Run(run func(policies ...SyncPolicy)) *MockWriteBuffer_EvictBuffer_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].([]int64))
variadicArgs := make([]SyncPolicy, len(args)-0)
for i, a := range args[0:] {
if a != nil {
variadicArgs[i] = a.(SyncPolicy)
}
}
run(variadicArgs...)
})
return _c
}
func (_c *MockWriteBuffer_FlushSegments_Call) Return(_a0 error) *MockWriteBuffer_FlushSegments_Call {
_c.Call.Return(_a0)
func (_c *MockWriteBuffer_EvictBuffer_Call) Return() *MockWriteBuffer_EvictBuffer_Call {
_c.Call.Return()
return _c
}
func (_c *MockWriteBuffer_FlushSegments_Call) RunAndReturn(run func(context.Context, []int64) error) *MockWriteBuffer_FlushSegments_Call {
func (_c *MockWriteBuffer_EvictBuffer_Call) RunAndReturn(run func(...SyncPolicy)) *MockWriteBuffer_EvictBuffer_Call {
_c.Call.Return(run)
return _c
}
@ -271,6 +274,90 @@ func (_c *MockWriteBuffer_HasSegment_Call) RunAndReturn(run func(int64) bool) *M
return _c
}
// MemorySize provides a mock function with given fields:
func (_m *MockWriteBuffer) MemorySize() int64 {
ret := _m.Called()
var r0 int64
if rf, ok := ret.Get(0).(func() int64); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(int64)
}
return r0
}
// MockWriteBuffer_MemorySize_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MemorySize'
type MockWriteBuffer_MemorySize_Call struct {
*mock.Call
}
// MemorySize is a helper method to define mock.On call
func (_e *MockWriteBuffer_Expecter) MemorySize() *MockWriteBuffer_MemorySize_Call {
return &MockWriteBuffer_MemorySize_Call{Call: _e.mock.On("MemorySize")}
}
func (_c *MockWriteBuffer_MemorySize_Call) Run(run func()) *MockWriteBuffer_MemorySize_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockWriteBuffer_MemorySize_Call) Return(_a0 int64) *MockWriteBuffer_MemorySize_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockWriteBuffer_MemorySize_Call) RunAndReturn(run func() int64) *MockWriteBuffer_MemorySize_Call {
_c.Call.Return(run)
return _c
}
// SealSegments provides a mock function with given fields: ctx, segmentIDs
func (_m *MockWriteBuffer) SealSegments(ctx context.Context, segmentIDs []int64) error {
ret := _m.Called(ctx, segmentIDs)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, []int64) error); ok {
r0 = rf(ctx, segmentIDs)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockWriteBuffer_SealSegments_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SealSegments'
type MockWriteBuffer_SealSegments_Call struct {
*mock.Call
}
// SealSegments is a helper method to define mock.On call
// - ctx context.Context
// - segmentIDs []int64
func (_e *MockWriteBuffer_Expecter) SealSegments(ctx interface{}, segmentIDs interface{}) *MockWriteBuffer_SealSegments_Call {
return &MockWriteBuffer_SealSegments_Call{Call: _e.mock.On("SealSegments", ctx, segmentIDs)}
}
func (_c *MockWriteBuffer_SealSegments_Call) Run(run func(ctx context.Context, segmentIDs []int64)) *MockWriteBuffer_SealSegments_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].([]int64))
})
return _c
}
func (_c *MockWriteBuffer_SealSegments_Call) Return(_a0 error) *MockWriteBuffer_SealSegments_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockWriteBuffer_SealSegments_Call) RunAndReturn(run func(context.Context, []int64) error) *MockWriteBuffer_SealSegments_Call {
_c.Call.Return(run)
return _c
}
// SetFlushTimestamp provides a mock function with given fields: flushTs
func (_m *MockWriteBuffer) SetFlushTimestamp(flushTs uint64) {
_m.Called(flushTs)

View File

@ -3,9 +3,12 @@ package writebuffer
import (
"math"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -37,6 +40,7 @@ func (buf *segmentBuffer) Yield() (insert *storage.InsertData, delete *storage.D
}
func (buf *segmentBuffer) MinTimestamp() typeutil.Timestamp {
log.Info("segmentID", zap.Int64("segmentID", buf.segmentID))
insertTs := buf.insertBuffer.MinTimestamp()
deltaTs := buf.deltaBuffer.MinTimestamp()
@ -65,6 +69,11 @@ func (buf *segmentBuffer) GetTimeRange() *TimeRange {
return result
}
// MemorySize returns total memory size of insert buffer & delta buffer.
func (buf *segmentBuffer) MemorySize() int64 {
return buf.insertBuffer.size + buf.deltaBuffer.size
}
// TimeRange is a range of timestamp contains the min-timestamp and max-timestamp
type TimeRange struct {
timestampMin typeutil.Timestamp

View File

@ -1,6 +1,7 @@
package writebuffer
import (
"container/heap"
"math/rand"
"time"
@ -97,3 +98,40 @@ func GetFlushTsPolicy(flushTimestamp *atomic.Uint64, meta metacache.MetaCache) S
return nil
}, "flush ts")
}
func GetOldestBufferPolicy(num int) SyncPolicy {
return wrapSelectSegmentFuncPolicy(func(buffers []*segmentBuffer, ts typeutil.Timestamp) []int64 {
h := &SegStartPosHeap{}
heap.Init(h)
for _, buf := range buffers {
heap.Push(h, buf)
if h.Len() > num {
heap.Pop(h)
}
}
return lo.Map(*h, func(buf *segmentBuffer, _ int) int64 { return buf.segmentID })
}, "oldest buffers")
}
// SegMemSizeHeap implement max-heap for sorting.
type SegStartPosHeap []*segmentBuffer
func (h SegStartPosHeap) Len() int { return len(h) }
func (h SegStartPosHeap) Less(i, j int) bool {
return h[i].MinTimestamp() > h[j].MinTimestamp()
}
func (h SegStartPosHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *SegStartPosHeap) Push(x any) {
*h = append(*h, x.(*segmentBuffer))
}
func (h *SegStartPosHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}

View File

@ -94,6 +94,50 @@ func (s *SyncPolicySuite) TestCompactedSegmentsPolicy() {
s.ElementsMatch(ids, result)
}
func (s *SyncPolicySuite) TestOlderBufferPolicy() {
policy := GetOldestBufferPolicy(2)
type testCase struct {
tag string
buffers []*segmentBuffer
expect []int64
}
cases := []*testCase{
{tag: "empty_buffers", buffers: nil, expect: []int64{}},
{tag: "3_candidates", buffers: []*segmentBuffer{
{
segmentID: 100,
insertBuffer: &InsertBuffer{BufferBase: BufferBase{startPos: &msgpb.MsgPosition{Timestamp: 1}}},
deltaBuffer: &DeltaBuffer{BufferBase: BufferBase{}},
},
{
segmentID: 200,
insertBuffer: &InsertBuffer{BufferBase: BufferBase{startPos: &msgpb.MsgPosition{Timestamp: 2}}},
deltaBuffer: &DeltaBuffer{BufferBase: BufferBase{}},
},
{
segmentID: 300,
insertBuffer: &InsertBuffer{BufferBase: BufferBase{startPos: &msgpb.MsgPosition{Timestamp: 3}}},
deltaBuffer: &DeltaBuffer{BufferBase: BufferBase{}},
},
}, expect: []int64{100, 200}},
{tag: "1_candidates", buffers: []*segmentBuffer{
{
segmentID: 100,
insertBuffer: &InsertBuffer{BufferBase: BufferBase{startPos: &msgpb.MsgPosition{Timestamp: 1}}},
deltaBuffer: &DeltaBuffer{BufferBase: BufferBase{}},
},
}, expect: []int64{100}},
}
for _, tc := range cases {
s.Run(tc.tag, func() {
s.ElementsMatch(tc.expect, policy.SelectSegments(tc.buffers, 0))
})
}
}
func TestSyncPolicy(t *testing.T) {
suite.Run(t, new(SyncPolicySuite))
}

View File

@ -44,12 +44,16 @@ type WriteBuffer interface {
SetFlushTimestamp(flushTs uint64)
// GetFlushTimestamp get current flush timestamp
GetFlushTimestamp() uint64
// FlushSegments is the method to perform `Sync` operation with provided options.
FlushSegments(ctx context.Context, segmentIDs []int64) error
// SealSegments is the method to perform `Sync` operation with provided options.
SealSegments(ctx context.Context, segmentIDs []int64) error
// GetCheckpoint returns current channel checkpoint.
// If there are any non-empty segment buffer, returns the earliest buffer start position.
// Otherwise, returns latest buffered checkpoint.
GetCheckpoint() *msgpb.MsgPosition
// MemorySize returns the size in bytes currently used by this write buffer.
MemorySize() int64
// EvictBuffer evicts buffer to sync manager which match provided sync policies.
EvictBuffer(policies ...SyncPolicy)
// Close is the method to close and sink current buffer data.
Close(drop bool)
}
@ -147,7 +151,7 @@ func (wb *writeBufferBase) HasSegment(segmentID int64) bool {
return ok
}
func (wb *writeBufferBase) FlushSegments(ctx context.Context, segmentIDs []int64) error {
func (wb *writeBufferBase) SealSegments(ctx context.Context, segmentIDs []int64) error {
wb.mut.RLock()
defer wb.mut.RUnlock()
@ -162,6 +166,40 @@ func (wb *writeBufferBase) GetFlushTimestamp() uint64 {
return wb.flushTimestamp.Load()
}
func (wb *writeBufferBase) MemorySize() int64 {
wb.mut.RLock()
defer wb.mut.RUnlock()
var size int64
for _, segBuf := range wb.buffers {
size += segBuf.MemorySize()
}
return size
}
func (wb *writeBufferBase) EvictBuffer(policies ...SyncPolicy) {
wb.mut.Lock()
defer wb.mut.Unlock()
log := log.Ctx(context.Background()).With(
zap.Int64("collectionID", wb.collectionID),
zap.String("channel", wb.channelName),
)
// need valid checkpoint before triggering syncing
if wb.checkpoint == nil {
log.Warn("evict buffer before buffering data")
return
}
ts := wb.checkpoint.GetTimestamp()
segmentIDs := wb.getSegmentsToSync(ts, policies...)
if len(segmentIDs) > 0 {
log.Info("evict buffer find segments to sync", zap.Int64s("segmentIDs", segmentIDs))
wb.syncSegments(context.Background(), segmentIDs)
}
}
func (wb *writeBufferBase) GetCheckpoint() *msgpb.MsgPosition {
log := log.Ctx(context.Background()).
With(zap.String("channel", wb.channelName)).
@ -225,7 +263,7 @@ func (wb *writeBufferBase) GetCheckpoint() *msgpb.MsgPosition {
}
func (wb *writeBufferBase) triggerSync() (segmentIDs []int64) {
segmentsToSync := wb.getSegmentsToSync(wb.checkpoint.GetTimestamp())
segmentsToSync := wb.getSegmentsToSync(wb.checkpoint.GetTimestamp(), wb.syncPolicies...)
if len(segmentsToSync) > 0 {
log.Info("write buffer get segments to sync", zap.Int64s("segmentIDs", segmentsToSync))
wb.syncSegments(context.Background(), segmentsToSync)
@ -282,10 +320,10 @@ func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64)
// getSegmentsToSync applies all policies to get segments list to sync.
// **NOTE** shall be invoked within mutex protection
func (wb *writeBufferBase) getSegmentsToSync(ts typeutil.Timestamp) []int64 {
func (wb *writeBufferBase) getSegmentsToSync(ts typeutil.Timestamp, policies ...SyncPolicy) []int64 {
buffers := lo.Values(wb.buffers)
segments := typeutil.NewSet[int64]()
for _, policy := range wb.syncPolicies {
for _, policy := range policies {
result := policy.SelectSegments(buffers, ts)
if len(result) > 0 {
log.Info("SyncPolicy selects segments", zap.Int64s("segmentIDs", result), zap.String("reason", policy.Reason()))

View File

@ -115,7 +115,7 @@ func (s *WriteBufferSuite) TestFlushSegments() {
wb, err := NewWriteBuffer(s.channelName, s.metacache, s.storageCache, s.syncMgr, WithDeletePolicy(DeletePolicyBFPkOracle))
s.NoError(err)
err = wb.FlushSegments(context.Background(), []int64{segmentID})
err = wb.SealSegments(context.Background(), []int64{segmentID})
s.NoError(err)
}
@ -299,6 +299,73 @@ func (s *WriteBufferSuite) TestSyncSegmentsError() {
})
}
func (s *WriteBufferSuite) TestEvictBuffer() {
wb, err := newWriteBufferBase(s.channelName, s.metacache, s.storageCache, s.syncMgr, &writeBufferOption{
pkStatsFactory: func(vchannel *datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
},
})
s.Require().NoError(err)
serializer := syncmgr.NewMockSerializer(s.T())
wb.serializer = serializer
s.Run("no_checkpoint", func() {
wb.mut.Lock()
wb.buffers[100] = &segmentBuffer{}
wb.mut.Unlock()
defer func() {
wb.mut.Lock()
defer wb.mut.Unlock()
wb.buffers = make(map[int64]*segmentBuffer)
}()
wb.EvictBuffer(GetOldestBufferPolicy(1))
serializer.AssertNotCalled(s.T(), "EncodeBuffer")
})
s.Run("trigger_sync", func() {
buf1, err := newSegmentBuffer(2, s.collSchema)
s.Require().NoError(err)
buf1.insertBuffer.startPos = &msgpb.MsgPosition{
Timestamp: 440,
}
buf1.deltaBuffer.startPos = &msgpb.MsgPosition{
Timestamp: 400,
}
buf2, err := newSegmentBuffer(3, s.collSchema)
s.Require().NoError(err)
buf2.insertBuffer.startPos = &msgpb.MsgPosition{
Timestamp: 550,
}
buf2.deltaBuffer.startPos = &msgpb.MsgPosition{
Timestamp: 600,
}
wb.mut.Lock()
wb.buffers[2] = buf1
wb.buffers[3] = buf2
wb.checkpoint = &msgpb.MsgPosition{Timestamp: 100}
wb.mut.Unlock()
segment := metacache.NewSegmentInfo(&datapb.SegmentInfo{
ID: 2,
}, nil)
s.metacache.EXPECT().GetSegmentByID(int64(2)).Return(segment, true)
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
serializer.EXPECT().EncodeBuffer(mock.Anything, mock.Anything).Return(syncmgr.NewSyncTask(), nil)
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).Return(nil)
defer func() {
s.wb.mut.Lock()
defer s.wb.mut.Unlock()
s.wb.buffers = make(map[int64]*segmentBuffer)
}()
wb.EvictBuffer(GetOldestBufferPolicy(1))
})
}
func TestWriteBufferBase(t *testing.T) {
suite.Run(t, new(WriteBufferSuite))
}

View File

@ -2827,6 +2827,7 @@ type dataNodeConfig struct {
// memory management
MemoryForceSyncEnable ParamItem `refreshable:"true"`
MemoryForceSyncSegmentNum ParamItem `refreshable:"true"`
MemoryCheckInterval ParamItem `refreshable:"true"`
MemoryWatermark ParamItem `refreshable:"true"`
DataNodeTimeTickByRPC ParamItem `refreshable:"false"`
@ -2939,6 +2940,15 @@ func (p *dataNodeConfig) init(base *BaseTable) {
}
p.MemoryForceSyncSegmentNum.Init(base.mgr)
p.MemoryCheckInterval = ParamItem{
Key: "datanode.memory.checkInterval",
Version: "2.4.0",
DefaultValue: "3000", // milliseconds
Doc: "the interal to check datanode memory usage, in milliseconds",
Export: true,
}
p.MemoryCheckInterval.Init(base.mgr)
if os.Getenv(metricsinfo.DeployModeEnvKey) == metricsinfo.StandaloneDeployMode {
p.MemoryWatermark = ParamItem{
Key: "datanode.memory.watermarkStandalone",