Make DN fail faster when watching an invalid chan (#11102)

See also: #11098

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/11263/head
XuanYang-cn 2021-11-04 15:56:28 +08:00 committed by GitHub
parent 0ef95c5df1
commit 8863c01ff7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 35 additions and 35 deletions

View File

@ -216,41 +216,6 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
return nil
})
c := &nodeConfig{
msFactory: dsService.msFactory,
collectionID: vchanInfo.GetCollectionID(),
vChannelName: vchanInfo.GetChannelName(),
replica: dsService.replica,
allocator: dsService.idAllocator,
parallelConfig: newParallelConfig(),
}
var dmStreamNode Node
dmStreamNode, err = newDmInputNode(dsService.ctx, vchanInfo.GetSeekPosition(), c)
if err != nil {
return err
}
var ddNode Node = newDDNode(dsService.clearSignal, dsService.collectionID, vchanInfo)
var insertBufferNode Node
insertBufferNode, err = newInsertBufferNode(
dsService.ctx,
dsService.flushCh,
dsService.flushManager,
dsService.flushingSegCache,
c,
)
if err != nil {
return err
}
var deleteNode Node
deleteNode, err = newDeleteNode(dsService.ctx, dsService.flushManager, c)
if err != nil {
return err
}
// recover segment checkpoints
for _, us := range vchanInfo.GetUnflushedSegments() {
if us.CollectionID != dsService.collectionID ||
@ -298,6 +263,41 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
}
}
c := &nodeConfig{
msFactory: dsService.msFactory,
collectionID: vchanInfo.GetCollectionID(),
vChannelName: vchanInfo.GetChannelName(),
replica: dsService.replica,
allocator: dsService.idAllocator,
parallelConfig: newParallelConfig(),
}
var dmStreamNode Node
dmStreamNode, err = newDmInputNode(dsService.ctx, vchanInfo.GetSeekPosition(), c)
if err != nil {
return err
}
var ddNode Node = newDDNode(dsService.clearSignal, dsService.collectionID, vchanInfo)
var insertBufferNode Node
insertBufferNode, err = newInsertBufferNode(
dsService.ctx,
dsService.flushCh,
dsService.flushManager,
dsService.flushingSegCache,
c,
)
if err != nil {
return err
}
var deleteNode Node
deleteNode, err = newDeleteNode(dsService.ctx, dsService.flushManager, c)
if err != nil {
return err
}
dsService.fg.AddNode(dmStreamNode)
dsService.fg.AddNode(ddNode)
dsService.fg.AddNode(insertBufferNode)