Move sync delete policy from deleteNode to insertBufferNode (#21152)

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/21203/head
bigsheeper 2022-12-13 16:15:21 +08:00 committed by GitHub
parent a81c9cc11c
commit f595500383
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 179 additions and 79 deletions

View File

@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"math"
"sync"
"go.uber.org/zap"
@ -40,6 +41,7 @@ import (
// but at the first stage, this struct is only used for delete buff
type DelBufferManager struct {
channel Channel
mu sync.Mutex // guards delMemorySize and delBufHeap
delMemorySize int64
delBufHeap *PriorityQueue
}
@ -97,6 +99,8 @@ func (bm *DelBufferManager) StoreNewDeletes(segID UniqueID, pks []primaryKey,
delDataBuf.updateStartAndEndPosition(startPos, endPos)
//4. update and sync memory size with priority queue
bm.mu.Lock()
defer bm.mu.Unlock()
if !loaded {
delDataBuf.item.segmentID = segID
delDataBuf.item.memorySize = bufSize
@ -120,6 +124,8 @@ func (bm *DelBufferManager) Load(segID UniqueID) (delDataBuf *DelDataBuf, ok boo
}
func (bm *DelBufferManager) Delete(segID UniqueID) {
bm.mu.Lock()
defer bm.mu.Unlock()
if buf, ok := bm.channel.getCurDeleteBuffer(segID); ok {
item := buf.item
bm.delMemorySize -= item.memorySize
@ -141,6 +147,8 @@ func (bm *DelBufferManager) CompactSegBuf(compactedToSegID UniqueID, compactedFr
bm.Delete(segID)
}
}
bm.mu.Lock()
defer bm.mu.Unlock()
// only store delBuf if EntriesNum > 0
if compactToDelBuff.EntriesNum > 0 {
if loaded {
@ -156,6 +164,8 @@ func (bm *DelBufferManager) CompactSegBuf(compactedToSegID UniqueID, compactedFr
}
func (bm *DelBufferManager) ShouldFlushSegments() []UniqueID {
bm.mu.Lock()
defer bm.mu.Unlock()
var shouldFlushSegments []UniqueID
if bm.delMemorySize < Params.DataNodeCfg.FlushDeleteBufferBytes.GetAsInt64() {
return shouldFlushSegments

View File

@ -54,6 +54,7 @@ type dataSyncService struct {
dataCoord types.DataCoord // DataCoord instance to interact with
clearSignal chan<- string // signal channel to notify flowgraph close for collection/partition drop msg consumed
delBufferManager *DelBufferManager
flushingSegCache *Cache // a guarding cache stores currently flushing segment ids
flushManager flushManager // flush manager handles flush process
chunkManager storage.ChunkManager
@ -80,6 +81,12 @@ func newDataSyncService(ctx context.Context,
ctx1, cancel := context.WithCancel(ctx)
delBufferManager := &DelBufferManager{
channel: channel,
delMemorySize: 0,
delBufHeap: &PriorityQueue{},
}
service := &dataSyncService{
ctx: ctx1,
cancelFn: cancel,
@ -93,6 +100,7 @@ func newDataSyncService(ctx context.Context,
vchannelName: vchan.GetChannelName(),
dataCoord: dataCoord,
clearSignal: clearSignal,
delBufferManager: delBufferManager,
flushingSegCache: flushingSegCache,
chunkManager: chunkManager,
compactor: compactor,
@ -287,6 +295,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
insertBufferNode, err = newInsertBufferNode(
dsService.ctx,
dsService.collectionID,
dsService.delBufferManager,
dsService.flushCh,
dsService.resendTTCh,
dsService.flushManager,
@ -298,7 +307,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
}
var deleteNode Node
deleteNode, err = newDeleteNode(dsService.ctx, dsService.flushManager, dsService.clearSignal, c)
deleteNode, err = newDeleteNode(dsService.ctx, dsService.flushManager, dsService.delBufferManager, dsService.clearSignal, c)
if err != nil {
return err
}

View File

@ -117,32 +117,13 @@ func (dn *deleteNode) Operate(in []Msg) []Msg {
dn.showDelBuf(segIDs, fgMsg.timeRange.timestampMax)
}
//here we adopt a quite radical strategy:
//every time we make sure that the N biggest delDataBuf can be flushed
//when memsize usage reaches a certain level
//then we will add all segments in the fgMsg.segmentsToFlush into the toFlushSeg and remove duplicate segments
//the aim for taking all these actions is to guarantee that the memory consumed by delBuf will not exceed a limit
segmentsToFlush := dn.delBufferManager.ShouldFlushSegments()
for _, msgSegmentID := range fgMsg.segmentsToSync {
existed := false
for _, autoFlushSegment := range segmentsToFlush {
if msgSegmentID == autoFlushSegment {
existed = true
break
}
}
if !existed {
segmentsToFlush = append(segmentsToFlush, msgSegmentID)
}
}
// process flush messages
if len(segmentsToFlush) > 0 {
if len(fgMsg.segmentsToSync) > 0 {
log.Debug("DeleteNode receives flush message",
zap.Int64s("segIDs", segmentsToFlush),
zap.Int64s("segIDs", fgMsg.segmentsToSync),
zap.String("vChannelName", dn.channelName),
zap.Time("posTime", tsoutil.PhysicalTime(fgMsg.endPositions[0].Timestamp)))
for _, segmentToFlush := range segmentsToFlush {
for _, segmentToFlush := range fgMsg.segmentsToSync {
buf, ok := dn.delBufferManager.Load(segmentToFlush)
if !ok {
// no related delta data to flush, send empty buf to complete flush life-cycle
@ -239,23 +220,19 @@ func (dn *deleteNode) filterSegmentByPK(partID UniqueID, pks []primaryKey, tss [
return segID2Pks, segID2Tss
}
func newDeleteNode(ctx context.Context, fm flushManager, sig chan<- string, config *nodeConfig) (*deleteNode, error) {
func newDeleteNode(ctx context.Context, fm flushManager, delBufManager *DelBufferManager, sig chan<- string, config *nodeConfig) (*deleteNode, error) {
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(config.maxQueueLength)
baseNode.SetMaxParallelism(config.maxParallelism)
return &deleteNode{
ctx: ctx,
BaseNode: baseNode,
delBufferManager: &DelBufferManager{
channel: config.channel,
delMemorySize: 0,
delBufHeap: &PriorityQueue{},
},
channel: config.channel,
idAllocator: config.allocator,
channelName: config.vChannelName,
flushManager: fm,
clearSignal: sig,
ctx: ctx,
BaseNode: baseNode,
delBufferManager: delBufManager,
channel: config.channel,
idAllocator: config.allocator,
channelName: config.vChannelName,
flushManager: fm,
clearSignal: sig,
}, nil
}

View File

@ -51,7 +51,7 @@ func TestFlowGraphDeleteNode_newDeleteNode(te *testing.T) {
for _, test := range tests {
te.Run(test.description, func(t *testing.T) {
dn, err := newDeleteNode(test.ctx, nil, make(chan string, 1), test.config)
dn, err := newDeleteNode(test.ctx, nil, nil, make(chan string, 1), test.config)
assert.Nil(t, err)
assert.NotNil(t, dn)
@ -189,8 +189,13 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
allocator: &allocator{},
vChannelName: chanName,
}
delBufManager := &DelBufferManager{
channel: channel,
delMemorySize: 0,
delBufHeap: &PriorityQueue{},
}
dn, err := newDeleteNode(context.Background(), fm, make(chan string, 1), c)
dn, err := newDeleteNode(context.Background(), fm, delBufManager, make(chan string, 1), c)
assert.Nil(t, err)
segID2Pks, _ := dn.filterSegmentByPK(0, varCharPks, tss)
@ -219,7 +224,13 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
vChannelName: chanName,
}
dn, err := newDeleteNode(context.Background(), fm, make(chan string, 1), c)
delBufManager := &DelBufferManager{
channel: channel,
delMemorySize: 0,
delBufHeap: &PriorityQueue{},
}
dn, err := newDeleteNode(context.Background(), fm, delBufManager, make(chan string, 1), c)
assert.Nil(t, err)
segID2Pks, _ := dn.filterSegmentByPK(0, int64Pks, tss)
@ -254,7 +265,12 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
allocator: NewAllocatorFactory(),
vChannelName: chanName,
}
delNode, err := newDeleteNode(ctx, fm, make(chan string, 1), c)
delBufManager := &DelBufferManager{
channel: channel,
delMemorySize: 0,
delBufHeap: &PriorityQueue{},
}
delNode, err := newDeleteNode(ctx, fm, delBufManager, make(chan string, 1), c)
assert.Nil(te, err)
msg := genFlowGraphDeleteMsg(int64Pks, chanName)
@ -277,7 +293,12 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
allocator: NewAllocatorFactory(),
vChannelName: chanName,
}
delNode, err := newDeleteNode(ctx, fm, make(chan string, 1), c)
delBufManager := &DelBufferManager{
channel: channel,
delMemorySize: 0,
delBufHeap: &PriorityQueue{},
}
delNode, err := newDeleteNode(ctx, fm, delBufManager, make(chan string, 1), c)
assert.Nil(te, err)
msg := genFlowGraphDeleteMsg(int64Pks, chanName)
@ -306,8 +327,13 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
allocator: NewAllocatorFactory(),
vChannelName: chanName,
}
delBufManager := &DelBufferManager{
channel: channel,
delMemorySize: 0,
delBufHeap: &PriorityQueue{},
}
sig := make(chan string, 1)
delNode, err := newDeleteNode(ctx, fm, sig, c)
delNode, err := newDeleteNode(ctx, fm, delBufManager, sig, c)
assert.Nil(t, err)
msg := genFlowGraphDeleteMsg(int64Pks, chanName)
@ -344,7 +370,12 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
allocator: NewAllocatorFactory(),
vChannelName: chanName,
}
delNode, err := newDeleteNode(ctx, fm, make(chan string, 1), c)
delBufManager := &DelBufferManager{
channel: channel,
delMemorySize: 0,
delBufHeap: &PriorityQueue{},
}
delNode, err := newDeleteNode(ctx, fm, delBufManager, make(chan string, 1), c)
assert.Nil(t, err)
compactedSegment := UniqueID(10020987)
@ -394,10 +425,15 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
allocator: NewAllocatorFactory(),
vChannelName: chanName,
}
delBufManager := &DelBufferManager{
channel: channel,
delMemorySize: 0,
delBufHeap: &PriorityQueue{},
}
mockFlushManager := &mockFlushManager{
recordFlushedSeg: true,
}
delNode, err := newDeleteNode(ctx, mockFlushManager, make(chan string, 1), c)
delNode, err := newDeleteNode(ctx, mockFlushManager, delBufManager, make(chan string, 1), c)
assert.Nil(t, err)
//2. here we set flushing segments inside fgmsg to empty
@ -410,6 +446,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
//and the sum of memory consumption in this case is 208
//so no segments will be flushed
paramtable.Get().Save(Params.DataNodeCfg.FlushDeleteBufferBytes.Key, "300")
fgMsg.(*flowGraphMsg).segmentsToSync = delNode.delBufferManager.ShouldFlushSegments()
delNode.Operate([]flowgraph.Msg{fgMsg})
assert.Equal(t, 0, len(mockFlushManager.flushedSegIDs))
assert.Equal(t, int64(208), delNode.delBufferManager.delMemorySize)
@ -422,6 +459,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
msg.deleteMessages = []*msgstream.DeleteMsg{}
msg.segmentsToSync = []UniqueID{}
paramtable.Get().Save(Params.DataNodeCfg.FlushDeleteBufferBytes.Key, "200")
fgMsg.(*flowGraphMsg).segmentsToSync = delNode.delBufferManager.ShouldFlushSegments()
delNode.Operate([]flowgraph.Msg{fgMsg})
assert.Equal(t, 1, len(mockFlushManager.flushedSegIDs))
assert.Equal(t, int64(160), delNode.delBufferManager.delMemorySize)
@ -429,6 +467,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
//4. there is no new delete msg and delBufferSize is still 200
//we expect there will not be any auto flush del
fgMsg.(*flowGraphMsg).segmentsToSync = delNode.delBufferManager.ShouldFlushSegments()
delNode.Operate([]flowgraph.Msg{fgMsg})
assert.Equal(t, 1, len(mockFlushManager.flushedSegIDs))
assert.Equal(t, int64(160), delNode.delBufferManager.delMemorySize)
@ -438,6 +477,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
//segment which is 48 in size to be flushed, so the remained del memory size
//will be 112
paramtable.Get().Save(Params.DataNodeCfg.FlushDeleteBufferBytes.Key, "150")
fgMsg.(*flowGraphMsg).segmentsToSync = delNode.delBufferManager.ShouldFlushSegments()
delNode.Operate([]flowgraph.Msg{fgMsg})
assert.Equal(t, 2, len(mockFlushManager.flushedSegIDs))
assert.Equal(t, int64(112), delNode.delBufferManager.delMemorySize)
@ -446,6 +486,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
//6. we reset buffer bytes to 60, then most of the segments will be flushed
//except for the smallest entry with size equaling to 32
paramtable.Get().Save(Params.DataNodeCfg.FlushDeleteBufferBytes.Key, "60")
fgMsg.(*flowGraphMsg).segmentsToSync = delNode.delBufferManager.ShouldFlushSegments()
delNode.Operate([]flowgraph.Msg{fgMsg})
assert.Equal(t, 4, len(mockFlushManager.flushedSegIDs))
assert.Equal(t, int64(32), delNode.delBufferManager.delMemorySize)
@ -455,6 +496,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
//is more than 20, so all five segments will be flushed and the remained
//del memory will be lowered to zero
paramtable.Get().Save(Params.DataNodeCfg.FlushDeleteBufferBytes.Key, "20")
fgMsg.(*flowGraphMsg).segmentsToSync = delNode.delBufferManager.ShouldFlushSegments()
delNode.Operate([]flowgraph.Msg{fgMsg})
assert.Equal(t, 5, len(mockFlushManager.flushedSegIDs))
assert.Equal(t, int64(0), delNode.delBufferManager.delMemorySize)
@ -484,7 +526,12 @@ func TestFlowGraphDeleteNode_showDelBuf(t *testing.T) {
allocator: NewAllocatorFactory(),
vChannelName: chanName,
}
delNode, err := newDeleteNode(ctx, fm, make(chan string, 1), c)
delBufManager := &DelBufferManager{
channel: channel,
delMemorySize: 0,
delBufHeap: &PriorityQueue{},
}
delNode, err := newDeleteNode(ctx, fm, delBufManager, make(chan string, 1), c)
require.NoError(t, err)
tests := []struct {
@ -529,7 +576,12 @@ func TestFlowGraphDeleteNode_updateCompactedSegments(t *testing.T) {
allocator: NewAllocatorFactory(),
vChannelName: chanName,
}
delNode, err := newDeleteNode(ctx, fm, make(chan string, 1), c)
delBufManager := &DelBufferManager{
channel: &channel,
delMemorySize: 0,
delBufHeap: &PriorityQueue{},
}
delNode, err := newDeleteNode(ctx, fm, delBufManager, make(chan string, 1), c)
require.NoError(t, err)
tests := []struct {

View File

@ -46,10 +46,11 @@ import (
type insertBufferNode struct {
BaseNode
ctx context.Context
channelName string
channel Channel
idAllocator allocatorInterface
ctx context.Context
channelName string
delBufferManager *DelBufferManager // manager of delete msg
channel Channel
idAllocator allocatorInterface
flushMap sync.Map
flushChan <-chan flushMsg
@ -330,6 +331,19 @@ func (ibNode *insertBufferNode) FillInSyncTasks(fgMsg *flowGraphMsg, seg2Upload
}
}
// sync delete
//here we adopt a quite radical strategy:
//every time we make sure that the N biggest delDataBuf can be flushed
//when memsize usage reaches a certain level
//the aim for taking all these actions is to guarantee that the memory consumed by delBuf will not exceed a limit
segmentsToFlush := ibNode.delBufferManager.ShouldFlushSegments()
for _, segID := range segmentsToFlush {
syncTasks[segID] = &syncTask{
buffer: nil, // nil is valid
segmentID: segID,
}
}
syncSegmentIDs := ibNode.channel.listSegmentIDsToSync(fgMsg.endPositions[0].Timestamp)
for _, segID := range syncSegmentIDs {
buf := ibNode.GetBuffer(segID)
@ -593,7 +607,7 @@ func (ibNode *insertBufferNode) getCollectionandPartitionIDbySegID(segmentID Uni
return ibNode.channel.getCollectionAndPartitionID(segmentID)
}
func newInsertBufferNode(ctx context.Context, collID UniqueID, flushCh <-chan flushMsg, resendTTCh <-chan resendTTMsg,
func newInsertBufferNode(ctx context.Context, collID UniqueID, delBufManager *DelBufferManager, flushCh <-chan flushMsg, resendTTCh <-chan resendTTMsg,
fm flushManager, flushingSegCache *Cache, config *nodeConfig) (*insertBufferNode, error) {
baseNode := BaseNode{}
@ -659,10 +673,11 @@ func newInsertBufferNode(ctx context.Context, collID UniqueID, flushCh <-chan fl
flushingSegCache: flushingSegCache,
flushManager: fm,
channel: config.channel,
idAllocator: config.allocator,
channelName: config.vChannelName,
ttMerger: mt,
ttLogger: &timeTickLogger{vChannelName: config.vChannelName},
delBufferManager: delBufManager,
channel: config.channel,
idAllocator: config.allocator,
channelName: config.vChannelName,
ttMerger: mt,
ttLogger: &timeTickLogger{vChannelName: config.vChannelName},
}, nil
}

View File

@ -105,8 +105,13 @@ func TestFlowGraphInsertBufferNodeCreate(t *testing.T) {
allocator: NewAllocatorFactory(),
vChannelName: "string",
}
delBufManager := &DelBufferManager{
channel: channel,
delMemorySize: 0,
delBufHeap: &PriorityQueue{},
}
iBNode, err := newInsertBufferNode(ctx, collMeta.ID, flushChan, resendTTChan, fm, newCache(), c)
iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c)
assert.NotNil(t, iBNode)
require.NoError(t, err)
@ -120,7 +125,7 @@ func TestFlowGraphInsertBufferNodeCreate(t *testing.T) {
cd: 0,
}
_, err = newInsertBufferNode(ctx, collMeta.ID, flushChan, resendTTChan, fm, newCache(), c)
_, err = newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c)
assert.Error(t, err)
}
@ -198,8 +203,13 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
allocator: NewAllocatorFactory(),
vChannelName: "string",
}
delBufManager := &DelBufferManager{
channel: channel,
delMemorySize: 0,
delBufHeap: &PriorityQueue{},
}
iBNode, err := newInsertBufferNode(ctx, collMeta.ID, flushChan, resendTTChan, fm, newCache(), c)
iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c)
require.NoError(t, err)
// trigger log ts
@ -361,7 +371,12 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
allocator: NewAllocatorFactory(),
vChannelName: "string",
}
iBNode, err := newInsertBufferNode(ctx, collMeta.ID, flushChan, resendTTChan, fm, newCache(), c)
delBufManager := &DelBufferManager{
channel: channel,
delMemorySize: 0,
delBufHeap: &PriorityQueue{},
}
iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c)
require.NoError(t, err)
// Auto flush number of rows set to 2
@ -596,7 +611,12 @@ func TestRollBF(t *testing.T) {
allocator: NewAllocatorFactory(),
vChannelName: "string",
}
iBNode, err := newInsertBufferNode(ctx, collMeta.ID, flushChan, resendTTChan, fm, newCache(), c)
delBufManager := &DelBufferManager{
channel: channel,
delMemorySize: 0,
delBufHeap: &PriorityQueue{},
}
iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c)
require.NoError(t, err)
// Auto flush number of rows set to 2
@ -675,7 +695,8 @@ func TestRollBF(t *testing.T) {
type InsertBufferNodeSuite struct {
suite.Suite
channel *ChannelMeta
channel *ChannelMeta
delBufManager *DelBufferManager
collID UniqueID
partID UniqueID
@ -690,9 +711,16 @@ func (s *InsertBufferNodeSuite) SetupSuite() {
pkType: schemapb.DataType_Int64,
}
delBufManager := &DelBufferManager{
channel: s.channel,
delMemorySize: 0,
delBufHeap: &PriorityQueue{},
}
s.collID = 1
s.partID = 10
s.channel = newChannel("channel", s.collID, nil, rc, s.cm)
s.delBufManager = delBufManager
s.cm = storage.NewLocalChunkManager(storage.RootPath(insertBufferNodeTestDir))
s.originalConfig = Params.DataNodeCfg.FlushInsertBufferSize.GetAsInt64()
@ -737,9 +765,10 @@ func (s *InsertBufferNodeSuite) TestFillInSyncTasks() {
fgMsg := &flowGraphMsg{dropCollection: true}
node := &insertBufferNode{
channelName: s.channel.channelName,
channel: s.channel,
flushChan: make(chan flushMsg, 100),
channelName: s.channel.channelName,
channel: s.channel,
delBufferManager: s.delBufManager,
flushChan: make(chan flushMsg, 100),
}
syncTasks := node.FillInSyncTasks(fgMsg, nil)
@ -756,9 +785,10 @@ func (s *InsertBufferNodeSuite) TestFillInSyncTasks() {
segToFlush := []UniqueID{1, 2}
node := &insertBufferNode{
channelName: s.channel.channelName,
channel: s.channel,
flushChan: make(chan flushMsg, 100),
channelName: s.channel.channelName,
channel: s.channel,
delBufferManager: s.delBufManager,
flushChan: make(chan flushMsg, 100),
}
buffer := BufferData{
@ -783,9 +813,10 @@ func (s *InsertBufferNodeSuite) TestFillInSyncTasks() {
s.Run("drop partition", func() {
fgMsg := flowGraphMsg{dropPartitions: []UniqueID{s.partID}, endPositions: []*internalpb.MsgPosition{{Timestamp: 100}}}
node := &insertBufferNode{
channelName: s.channel.channelName,
channel: s.channel,
flushChan: make(chan flushMsg, 100),
channelName: s.channel.channelName,
channel: s.channel,
delBufferManager: s.delBufManager,
flushChan: make(chan flushMsg, 100),
}
syncTasks := node.FillInSyncTasks(&fgMsg, nil)
@ -802,9 +833,10 @@ func (s *InsertBufferNodeSuite) TestFillInSyncTasks() {
s.Run("manual sync", func() {
flushCh := make(chan flushMsg, 100)
node := &insertBufferNode{
channelName: s.channel.channelName,
channel: s.channel,
flushChan: flushCh,
channelName: s.channel.channelName,
channel: s.channel,
delBufferManager: s.delBufManager,
flushChan: flushCh,
}
for i := 1; i <= 3; i++ {
@ -829,9 +861,10 @@ func (s *InsertBufferNodeSuite) TestFillInSyncTasks() {
s.Run("manual sync over load", func() {
flushCh := make(chan flushMsg, 100)
node := &insertBufferNode{
channelName: s.channel.channelName,
channel: s.channel,
flushChan: flushCh,
channelName: s.channel.channelName,
channel: s.channel,
delBufferManager: s.delBufManager,
flushChan: flushCh,
}
for i := 1; i <= 100; i++ {
@ -852,7 +885,6 @@ func (s *InsertBufferNodeSuite) TestFillInSyncTasks() {
s.Assert().False(task.auto)
s.Assert().False(task.dropped)
}
})
}
@ -935,7 +967,12 @@ func TestInsertBufferNode_bufferInsertMsg(t *testing.T) {
allocator: NewAllocatorFactory(),
vChannelName: "string",
}
iBNode, err := newInsertBufferNode(ctx, collMeta.ID, flushChan, resendTTChan, fm, newCache(), c)
delBufManager := &DelBufferManager{
channel: channel,
delMemorySize: 0,
delBufHeap: &PriorityQueue{},
}
iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c)
require.NoError(t, err)
inMsg := genFlowGraphInsertMsg(insertChannelName)