Add flush channel for delete_node (#8762)

Resolves: #8761

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/8804/head
XuanYang-cn 2021-09-28 18:22:16 +08:00 committed by GitHub
parent ae8fb86d25
commit 3be6672753
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 70 additions and 33 deletions

View File

@ -87,9 +87,10 @@ type DataNode struct {
chanMut sync.RWMutex
vchan2SyncService map[string]*dataSyncService // vchannel name
vchan2FlushCh map[string]chan<- *flushMsg // vchannel name
clearSignal chan UniqueID // collection ID
segmentCache *Cache
vchan2FlushChs map[string]*flushChans // vchannel name to flush channels
clearSignal chan UniqueID // collection ID
segmentCache *Cache
rootCoord types.RootCoord
dataCoord types.DataCoord
@ -103,6 +104,14 @@ type DataNode struct {
msFactory msgstream.Factory
}
type flushChans struct {
// Flush signal for insert buffer
insertBufferCh chan *flushMsg
// Flush signal for delete buffer
deleteBufferCh chan *flushMsg
}
// NewDataNode will return a DataNode with abnormal state.
func NewDataNode(ctx context.Context, factory msgstream.Factory) *DataNode {
rand.Seed(time.Now().UnixNano())
@ -118,7 +127,7 @@ func NewDataNode(ctx context.Context, factory msgstream.Factory) *DataNode {
segmentCache: newCache(),
vchan2SyncService: make(map[string]*dataSyncService),
vchan2FlushCh: make(map[string]chan<- *flushMsg),
vchan2FlushChs: make(map[string]*flushChans),
clearSignal: make(chan UniqueID, 100),
}
node.UpdateStateCode(internalpb.StateCode_Abnormal)
@ -256,14 +265,18 @@ func (node *DataNode) NewDataSyncService(vchan *datapb.VchannelInfo) error {
zap.Int("Flushed Segment Number", len(vchan.GetFlushedSegments())),
)
flushChan := make(chan *flushMsg, 100)
dataSyncService, err := newDataSyncService(node.ctx, flushChan, replica, alloc, node.msFactory, vchan, node.clearSignal, node.dataCoord, node.segmentCache)
flushChs := &flushChans{
insertBufferCh: make(chan *flushMsg, 100),
deleteBufferCh: make(chan *flushMsg, 100),
}
dataSyncService, err := newDataSyncService(node.ctx, flushChs, replica, alloc, node.msFactory, vchan, node.clearSignal, node.dataCoord, node.segmentCache)
if err != nil {
return err
}
node.vchan2SyncService[vchan.GetChannelName()] = dataSyncService
node.vchan2FlushCh[vchan.GetChannelName()] = flushChan
node.vchan2FlushChs[vchan.GetChannelName()] = flushChs
log.Info("Start New dataSyncService",
zap.Int64("Collection ID", vchan.GetCollectionID()),
@ -302,7 +315,7 @@ func (node *DataNode) ReleaseDataSyncService(vchanName string) {
}
delete(node.vchan2SyncService, vchanName)
delete(node.vchan2FlushCh, vchanName)
delete(node.vchan2FlushChs, vchanName)
log.Debug("Release flowgraph resources end", zap.String("Vchannel", vchanName))
}
@ -455,14 +468,14 @@ func (node *DataNode) ReadyToFlush() error {
node.chanMut.RLock()
defer node.chanMut.RUnlock()
if len(node.vchan2SyncService) == 0 && len(node.vchan2FlushCh) == 0 {
if len(node.vchan2SyncService) == 0 && len(node.vchan2FlushChs) == 0 {
// Healthy but Idle
msg := "DataNode HEALTHY but IDLE, please try WatchDmChannels to make it work"
log.Warn(msg)
return errors.New(msg)
}
if len(node.vchan2SyncService) != len(node.vchan2FlushCh) {
if len(node.vchan2SyncService) != len(node.vchan2FlushChs) {
// TODO restart
msg := "DataNode HEALTHY but abnormal inside, restarting..."
log.Warn(msg)
@ -511,20 +524,25 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
node.segmentCache.Cache(id)
node.chanMut.RLock()
flushCh, ok := node.vchan2FlushCh[chanName]
flushChs, ok := node.vchan2FlushChs[chanName]
node.chanMut.RUnlock()
if !ok {
status.Reason = "DataNode abnormal, restarting"
return status, nil
}
flushmsg := &flushMsg{
insertFlushmsg := flushMsg{
msgID: req.Base.MsgID,
timestamp: req.Base.Timestamp,
segmentID: id,
collectionID: req.CollectionID,
}
flushCh <- flushmsg
// Copy flushMsg to a different address
deleteFlushMsg := insertFlushmsg
flushChs.insertBufferCh <- &insertFlushmsg
flushChs.deleteBufferCh <- &deleteFlushMsg
}
log.Debug("FlushSegments tasks triggered",

View File

@ -105,14 +105,14 @@ func TestDataNode(t *testing.T) {
assert.NoError(t, err)
if testcase.expect {
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
assert.NotNil(t, node1.vchan2FlushCh)
assert.NotNil(t, node1.vchan2FlushChs)
assert.NotNil(t, node1.vchan2SyncService)
sync, ok := node1.vchan2SyncService[testcase.channels[0]]
assert.True(t, ok)
assert.NotNil(t, sync)
assert.Equal(t, UniqueID(1), sync.collectionID)
assert.Equal(t, len(testcase.channels), len(node1.vchan2SyncService))
assert.Equal(t, len(node1.vchan2FlushCh), len(node1.vchan2SyncService))
assert.Equal(t, len(node1.vchan2FlushChs), len(node1.vchan2SyncService))
} else {
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.ErrorCode)
assert.Equal(t, testcase.failReason, resp.Reason)
@ -174,17 +174,17 @@ func TestDataNode(t *testing.T) {
UnflushedSegments: []*datapb.SegmentInfo{},
}
require.Equal(t, 0, len(node2.vchan2FlushCh))
require.Equal(t, 0, len(node2.vchan2FlushChs))
require.Equal(t, 0, len(node2.vchan2SyncService))
err := node2.NewDataSyncService(vchan)
assert.NoError(t, err)
assert.Equal(t, 1, len(node2.vchan2FlushCh))
assert.Equal(t, 1, len(node2.vchan2FlushChs))
assert.Equal(t, 1, len(node2.vchan2SyncService))
err = node2.NewDataSyncService(vchan)
assert.NoError(t, err)
assert.Equal(t, 1, len(node2.vchan2FlushCh))
assert.Equal(t, 1, len(node2.vchan2FlushChs))
assert.Equal(t, 1, len(node2.vchan2SyncService))
cancel()
@ -367,7 +367,7 @@ func TestDataNode(t *testing.T) {
assert.Eventually(t, func() bool {
node.chanMut.Lock()
defer node.chanMut.Unlock()
return len(node.vchan2FlushCh) == 0
return len(node.vchan2FlushChs) == 0
}, time.Second, time.Millisecond)
cancel()
@ -384,12 +384,12 @@ func TestDataNode(t *testing.T) {
err := node.NewDataSyncService(vchan)
require.NoError(t, err)
require.Equal(t, 1, len(node.vchan2FlushCh))
require.Equal(t, 1, len(node.vchan2FlushChs))
require.Equal(t, 1, len(node.vchan2SyncService))
time.Sleep(100 * time.Millisecond)
node.ReleaseDataSyncService(dmChannelName)
assert.Equal(t, 0, len(node.vchan2FlushCh))
assert.Equal(t, 0, len(node.vchan2FlushChs))
assert.Equal(t, 0, len(node.vchan2SyncService))
s, ok := node.vchan2SyncService[dmChannelName]

View File

@ -31,7 +31,7 @@ type dataSyncService struct {
ctx context.Context
cancelFn context.CancelFunc
fg *flowgraph.TimeTickedFlowGraph
flushChan <-chan *flushMsg
flushChs *flushChans
replica Replica
idAllocator allocatorInterface
msFactory msgstream.Factory
@ -45,7 +45,7 @@ type dataSyncService struct {
}
func newDataSyncService(ctx context.Context,
flushChan <-chan *flushMsg,
flushChs *flushChans,
replica Replica,
alloc allocatorInterface,
factory msgstream.Factory,
@ -66,7 +66,7 @@ func newDataSyncService(ctx context.Context,
ctx: ctx1,
cancelFn: cancel,
fg: nil,
flushChan: flushChan,
flushChs: flushChs,
replica: replica,
idAllocator: alloc,
msFactory: factory,
@ -181,7 +181,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
dsService.replica,
dsService.msFactory,
dsService.idAllocator,
dsService.flushChan,
dsService.flushChs.insertBufferCh,
saveBinlog,
vchanInfo.GetChannelName(),
dsService.flushingSegCache,
@ -190,7 +190,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
return err
}
dn := newDeleteNode(dsService.replica, vchanInfo.GetChannelName())
dn := newDeleteNode(dsService.replica, vchanInfo.GetChannelName(), dsService.flushChs.deleteBufferCh)
var deleteNode Node = dn

View File

@ -98,7 +98,7 @@ func TestDataSyncService_newDataSyncService(te *testing.T) {
}
ds, err := newDataSyncService(ctx,
make(chan *flushMsg),
&flushChans{make(chan *flushMsg), make(chan *flushMsg)},
replica,
NewAllocatorFactory(),
test.inMsgFactory,
@ -163,7 +163,7 @@ func TestDataSyncService_Start(t *testing.T) {
mockRootCoord := &RootCoordFactory{}
collectionID := UniqueID(1)
flushChan := make(chan *flushMsg, 100)
flushChan := &flushChans{make(chan *flushMsg, 100), make(chan *flushMsg, 100)}
replica := newReplica(mockRootCoord, collectionID)
allocFactory := NewAllocatorFactory(1)

View File

@ -26,6 +26,8 @@ type deleteNode struct {
channelName string
replica Replica
flushCh <-chan *flushMsg
}
func (dn *deleteNode) Name() string {
@ -50,6 +52,16 @@ func (dn *deleteNode) Operate(in []Msg) []Msg {
return []Msg{}
}
select {
case fmsg := <-dn.flushCh:
currentSegID := fmsg.segmentID
log.Debug("DeleteNode receives flush message",
zap.Int64("segmentID", currentSegID),
zap.Int64("collectionID", fmsg.collectionID),
)
default:
}
return []Msg{}
}
@ -75,7 +87,7 @@ func (dn *deleteNode) filterSegmentByPK(pks []int64) (map[int64][]int64, error)
return results, nil
}
func newDeleteNode(replica Replica, channelName string) *deleteNode {
func newDeleteNode(replica Replica, channelName string, flushCh <-chan *flushMsg) *deleteNode {
baseNode := BaseNode{}
baseNode.SetMaxParallelism(Params.FlowGraphMaxQueueLength)
@ -84,5 +96,7 @@ func newDeleteNode(replica Replica, channelName string) *deleteNode {
channelName: channelName,
replica: replica,
flushCh: flushCh,
}
}

View File

@ -52,7 +52,7 @@ func TestFlowGraphDeleteNode_newDeleteNode(te *testing.T) {
for _, test := range tests {
te.Run(test.description, func(t *testing.T) {
dn := newDeleteNode(test.replica, "")
dn := newDeleteNode(test.replica, "", make(chan *flushMsg))
assert.NotNil(t, dn)
assert.Equal(t, "deleteNode", dn.Name())
@ -81,11 +81,13 @@ func TestFlowGraphDeleteNode_Operate(te *testing.T) {
for _, test := range tests {
te.Run(test.description, func(t *testing.T) {
dn := deleteNode{}
flushCh := make(chan *flushMsg, 10)
dn := deleteNode{flushCh: flushCh}
if test.invalidIn != nil {
rt := dn.Operate(test.invalidIn)
assert.Empty(t, rt)
} else {
flushCh <- &flushMsg{0, 100, 10, 1}
rt := dn.Operate(test.validIn)
assert.Empty(t, rt)
}
@ -145,7 +147,7 @@ func Test_GetSegmentsByPKs(t *testing.T) {
mockReplica.normalSegments[segment4.segmentID] = segment4
mockReplica.flushedSegments[segment5.segmentID] = segment5
mockReplica.flushedSegments[segment6.segmentID] = segment6
dn := newDeleteNode(mockReplica, "test")
dn := newDeleteNode(mockReplica, "test", make(chan *flushMsg))
results, err := dn.filterSegmentByPK([]int64{0, 1, 2, 3, 4})
assert.Nil(t, err)
expected := map[int64][]int64{

View File

@ -147,12 +147,15 @@ func (i *IndexNode) Init() error {
BucketName: Params.MinioBucketName,
CreateBucket: true,
}
i.kv, err = miniokv.NewMinIOKV(i.loopCtx, option)
kv, err := miniokv.NewMinIOKV(i.loopCtx, option)
if err != nil {
log.Error("IndexNode NewMinIOKV failed", zap.Error(err))
initErr = err
return
}
i.kv = kv
log.Debug("IndexNode NewMinIOKV success")
i.closer = trace.InitTracing("index_node")