Remove panic in NewDataSyncService (#5888)

Signed-off-by: sunby <bingyi.sun@zilliz.com>
pull/5893/head
sunby 2021-06-19 15:18:06 +08:00 committed by GitHub
parent 143b54143c
commit 3cc0ab1345
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 43 additions and 76 deletions

View File

@ -166,7 +166,10 @@ func (node *DataNode) NewDataSyncService(vchan *datapb.VchannelInfo) error {
)
flushChan := make(chan *flushMsg, 100)
dataSyncService := newDataSyncService(node.ctx, flushChan, replica, alloc, node.msFactory, vchan, node.clearSignal, node.dataService, node.masterService)
dataSyncService, err := newDataSyncService(node.ctx, flushChan, replica, alloc, node.msFactory, vchan, node.clearSignal, node.dataService)
if err != nil {
return err
}
node.vchan2SyncService[vchan.GetChannelName()] = dataSyncService
node.vchan2FlushCh[vchan.GetChannelName()] = flushChan
@ -262,7 +265,9 @@ func (node *DataNode) WatchDmChannels(ctx context.Context, in *datapb.WatchDmCha
zap.String("channel name", chanInfo.ChannelName),
zap.Any("channal Info", chanInfo),
)
node.NewDataSyncService(chanInfo)
if err := node.NewDataSyncService(chanInfo); err != nil {
log.Warn("Failed to new data sync service", zap.Any("channel", chanInfo))
}
}
status.ErrorCode = commonpb.ErrorCode_Success

View File

@ -19,7 +19,7 @@ import (
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/rootcoord"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/flowgraph"
@ -27,17 +27,16 @@ import (
)
type dataSyncService struct {
ctx context.Context
cancelFn context.CancelFunc
fg *flowgraph.TimeTickedFlowGraph
flushChan <-chan *flushMsg
replica Replica
idAllocator allocatorInterface
msFactory msgstream.Factory
collectionID UniqueID
dataService types.DataService
masterService types.MasterService
clearSignal chan<- UniqueID
ctx context.Context
cancelFn context.CancelFunc
fg *flowgraph.TimeTickedFlowGraph
flushChan <-chan *flushMsg
replica Replica
idAllocator allocatorInterface
msFactory msgstream.Factory
collectionID UniqueID
dataService types.DataService
clearSignal chan<- UniqueID
}
func newDataSyncService(ctx context.Context,
@ -48,28 +47,28 @@ func newDataSyncService(ctx context.Context,
vchan *datapb.VchannelInfo,
clearSignal chan<- UniqueID,
dataService types.DataService,
masterService types.MasterService,
) *dataSyncService {
) (*dataSyncService, error) {
ctx1, cancel := context.WithCancel(ctx)
service := &dataSyncService{
ctx: ctx1,
cancelFn: cancel,
fg: nil,
flushChan: flushChan,
replica: replica,
idAllocator: alloc,
msFactory: factory,
collectionID: vchan.GetCollectionID(),
dataService: dataService,
clearSignal: clearSignal,
masterService: masterService,
ctx: ctx1,
cancelFn: cancel,
fg: nil,
flushChan: flushChan,
replica: replica,
idAllocator: alloc,
msFactory: factory,
collectionID: vchan.GetCollectionID(),
dataService: dataService,
clearSignal: clearSignal,
}
service.initNodes(vchan)
return service
if err := service.initNodes(vchan); err != nil {
return nil, err
}
return service, nil
}
func (dsService *dataSyncService) start() {
@ -90,43 +89,7 @@ func (dsService *dataSyncService) close() {
dsService.cancelFn()
}
func (dsService *dataSyncService) getPChannel(collectionID UniqueID, vchan string) (string, error) {
req := &milvuspb.DescribeCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DescribeCollection,
MsgID: 0,
Timestamp: 0,
SourceID: Params.NodeID,
},
DbName: "",
CollectionName: "",
CollectionID: collectionID,
TimeStamp: 0,
}
resp, err := dsService.masterService.DescribeCollection(dsService.ctx, req)
if err != nil {
log.Error("Failed to describe collection", zap.Int64("collectionID", collectionID))
return "", err
}
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
log.Error("Failed to describe collection", zap.Int64("collectionID", collectionID),
zap.String("Reason", resp.Status.GetReason()))
return "", fmt.Errorf("Failed to describe collection, resp.Reason: %s", resp.Status.GetReason())
}
vchans := resp.GetVirtualChannelNames()
pchans := resp.GetPhysicalChannelNames()
for i, v := range vchans {
if vchan == v {
return pchans[i], nil
}
}
return "", fmt.Errorf("Can not find physical channel of %s", vchan)
}
func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) {
func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) error {
// TODO: add delete pipeline support
dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx)
@ -138,7 +101,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) {
err := dsService.msFactory.SetParams(m)
if err != nil {
panic(err)
return err
}
saveBinlog := func(fu *segmentFlushUnit) error {
@ -186,11 +149,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) {
return nil
}
pchan, err := dsService.getPChannel(vchanInfo.GetCollectionID(), vchanInfo.GetChannelName())
if err != nil {
//FIXME dont panic
panic(err)
}
pchan := rootcoord.ToPhysicalChannel(vchanInfo.GetChannelName())
var dmStreamNode Node = newDmInputNode(
dsService.ctx,
dsService.msFactory,
@ -241,7 +200,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) {
)
if err != nil {
log.Error("set edges failed in node", zap.String("name", dmStreamNode.Name()), zap.Error(err))
panic("set edges faild in the node")
return err
}
// ddNode
@ -251,7 +210,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) {
)
if err != nil {
log.Error("set edges failed in node", zap.String("name", ddNode.Name()), zap.Error(err))
panic("set edges faild in the node")
return err
}
// insertBufferNode
@ -261,6 +220,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) {
)
if err != nil {
log.Error("set edges failed in node", zap.String("name", insertBufferNode.Name()), zap.Error(err))
panic("set edges faild in the node")
return err
}
return nil
}

View File

@ -52,6 +52,7 @@ func TestDataSyncService_Start(t *testing.T) {
"receiveBufSize": 1024,
"pulsarBufSize": 1024}
err := msFactory.SetParams(m)
assert.Nil(t, err)
insertChannelName := "data_sync_service_test_dml"
ddlChannelName := "data_sync_service_test_ddl"
@ -65,8 +66,9 @@ func TestDataSyncService_Start(t *testing.T) {
}
signalCh := make(chan UniqueID, 100)
sync := newDataSyncService(ctx, flushChan, replica, allocFactory, msFactory, vchan, signalCh, &DataServiceFactory{}, &MasterServiceFactory{})
sync, err := newDataSyncService(ctx, flushChan, replica, allocFactory, msFactory, vchan, signalCh, &DataServiceFactory{})
assert.Nil(t, err)
// sync.replica.addCollection(collMeta.ID, collMeta.Schema)
go sync.start()