diff --git a/docs/design_docs/datanode_ddl_flush_design_0519_2021.md b/docs/design_docs/datanode_ddl_flush_design_0519_2021.md index 167bf99fce..3ac5746bb0 100644 --- a/docs/design_docs/datanode_ddl_flush_design_0519_2021.md +++ b/docs/design_docs/datanode_ddl_flush_design_0519_2021.md @@ -2,6 +2,9 @@ update: 5.19.2021, by [Goose](https://github.com/XuanYang-cn) update: 5.21.2021, by [Goose](https://github.com/XuanYang-cn) +update: 6.04.2021, by [Goose](https://github.com/XuanYang-cn) + +**THIS IS OUTDATE** ## Background diff --git a/go.mod b/go.mod index 90b7620131..5b0988e461 100644 --- a/go.mod +++ b/go.mod @@ -40,7 +40,7 @@ require ( github.com/yahoo/athenz v1.9.16 // indirect go.etcd.io/etcd v3.3.25+incompatible go.uber.org/zap v1.15.0 - golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a // indirect + golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb golang.org/x/text v0.3.3 diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 3087289f28..858fcd0c8a 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -64,8 +64,9 @@ type DataNode struct { watchDm chan struct{} chanMut sync.RWMutex - vchan2SyncService map[string]*dataSyncService - vchan2FlushCh map[string]chan<- *flushMsg + vchan2SyncService map[string]*dataSyncService // vchannel name + vchan2FlushCh map[string]chan<- *flushMsg // vchannel name + clearSignal chan UniqueID // collection ID masterService types.MasterService dataService types.DataService @@ -93,6 +94,7 @@ func NewDataNode(ctx context.Context, factory msgstream.Factory) *DataNode { vchan2SyncService: make(map[string]*dataSyncService), vchan2FlushCh: make(map[string]chan<- *flushMsg), + clearSignal: make(chan UniqueID, 100), } node.UpdateStateCode(internalpb.StateCode_Abnormal) return node @@ -201,7 +203,7 @@ func (node *DataNode) NewDataSyncService(vchan *datapb.VchannelInfo) error { metaService := newMetaService(node.ctx, replica, node.masterService) flushChan := make(chan *flushMsg, 100) - dataSyncService := newDataSyncService(node.ctx, flushChan, replica, alloc, node.msFactory, vchan) + dataSyncService := newDataSyncService(node.ctx, flushChan, replica, alloc, node.msFactory, vchan, node.clearSignal) // TODO metaService using timestamp in DescribeCollection node.vchan2SyncService[vchan.GetChannelName()] = dataSyncService node.vchan2FlushCh[vchan.GetChannelName()] = flushChan @@ -212,8 +214,44 @@ func (node *DataNode) NewDataSyncService(vchan *datapb.VchannelInfo) error { return nil } +// BackGroundGC runs in background to release datanode resources +func (node *DataNode) BackGroundGC(collIDCh <-chan UniqueID) { + log.Info("DataNode Background GC Start") + for { + select { + case collID := <-collIDCh: + log.Info("GC collection", zap.Int64("ID", collID)) + for _, vchanName := range node.getChannelNamesbyCollectionID(collID) { + node.ReleaseDataSyncService(vchanName) + } + case <-node.ctx.Done(): + return + } + } +} + +// ReleaseDataSyncService release flowgraph resources for a vchanName +func (node *DataNode) ReleaseDataSyncService(vchanName string) { + log.Info("Release flowgraph resources begin", zap.String("Vchannel", vchanName)) + + node.chanMut.Lock() + if dss, ok := node.vchan2SyncService[vchanName]; ok { + dss.close() + } + + delete(node.vchan2SyncService, vchanName) + node.chanMut.Unlock() + + node.chanMut.Lock() + delete(node.vchan2FlushCh, vchanName) + node.chanMut.Unlock() + + log.Debug("Release flowgraph resources end", zap.String("Vchannel", vchanName)) +} + // Start will update DataNode state to HEALTHY func (node *DataNode) Start() error { + go node.BackGroundGC(node.clearSignal) node.UpdateStateCode(internalpb.StateCode_Healthy) return nil } @@ -263,7 +301,7 @@ func (node *DataNode) GetComponentStates(ctx context.Context) (*internalpb.Compo return states, nil } -func (node *DataNode) getChannelName(segID UniqueID) string { +func (node *DataNode) getChannelNamebySegmentID(segID UniqueID) string { node.chanMut.RLock() defer node.chanMut.RUnlock() for name, dataSync := range node.vchan2SyncService { @@ -274,6 +312,19 @@ func (node *DataNode) getChannelName(segID UniqueID) string { return "" } +func (node *DataNode) getChannelNamesbyCollectionID(collID UniqueID) []string { + node.chanMut.RLock() + defer node.chanMut.RUnlock() + + channels := make([]string, 0, len(node.vchan2SyncService)) + for name, dataSync := range node.vchan2SyncService { + if dataSync.collectionID == collID { + channels = append(channels, name) + } + } + return channels +} + // ReadyToFlush tells wether DataNode is ready for flushing func (node *DataNode) ReadyToFlush() error { if node.State.Load().(internalpb.StateCode) != internalpb.StateCode_Healthy { @@ -328,7 +379,7 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen log.Debug("FlushSegments ...", zap.Int("num", len(req.SegmentIDs))) dmlFlushedCh := make(chan []*datapb.ID2PathList, len(req.SegmentIDs)) for _, id := range req.SegmentIDs { - chanName := node.getChannelName(id) + chanName := node.getChannelNamebySegmentID(id) log.Info("vchannel", zap.String("name", chanName)) if len(chanName) == 0 { status.Reason = fmt.Sprintf("DataNode not find segment %d!", id) diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index ec62993817..629be0b620 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -15,6 +15,7 @@ import ( "math" "os" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -116,9 +117,8 @@ func TestDataNode(t *testing.T) { t.Run("Test FlushSegments", func(t *testing.T) { dmChannelName := "fake-dm-channel-test-HEALTHDataNodeMock" - ddChannelName := "fake-dd-channel-test-HEALTHDataNodeMock" - node1 := newHEALTHDataNodeMock(dmChannelName, ddChannelName) + node1 := newHEALTHDataNodeMock(dmChannelName) sync, ok := node1.vchan2SyncService[dmChannelName] assert.True(t, ok) @@ -166,24 +166,14 @@ func TestDataNode(t *testing.T) { insertStream, _ := msFactory.NewMsgStream(node1.ctx) insertStream.AsProducer([]string{dmChannelName}) - ddStream, _ := msFactory.NewMsgStream(node1.ctx) - ddStream.AsProducer([]string{ddChannelName}) - var insertMsgStream msgstream.MsgStream = insertStream insertMsgStream.Start() - var ddMsgStream msgstream.MsgStream = ddStream - ddMsgStream.Start() - err = insertMsgStream.Broadcast(&timeTickMsgPack) assert.NoError(t, err) - err = ddMsgStream.Broadcast(&timeTickMsgPack) - assert.NoError(t, err) err = insertMsgStream.Broadcast(&timeTickMsgPack) assert.NoError(t, err) - err = ddMsgStream.Broadcast(&timeTickMsgPack) - assert.NoError(t, err) _, err = sync.replica.getSegmentByID(0) assert.NoError(t, err) @@ -204,6 +194,64 @@ func TestDataNode(t *testing.T) { assert.NoError(t, err) }) + t.Run("Test ReleaseDataSyncService", func(t *testing.T) { + dmChannelName := "fake-dm-channel-test-NewDataSyncService" + + vchan := &datapb.VchannelInfo{ + CollectionID: 1, + ChannelName: dmChannelName, + CheckPoints: []*datapb.CheckPoint{}, + } + + err := node.NewDataSyncService(vchan) + assert.NoError(t, err) + assert.Equal(t, 1, len(node.vchan2FlushCh)) + assert.Equal(t, 1, len(node.vchan2SyncService)) + time.Sleep(time.Second) + + node.ReleaseDataSyncService(dmChannelName) + assert.Equal(t, 0, len(node.vchan2FlushCh)) + assert.Equal(t, 0, len(node.vchan2SyncService)) + + s, ok := node.vchan2SyncService[dmChannelName] + assert.False(t, ok) + assert.Nil(t, s) + + }) + + t.Run("Test BackGroundGC", func(t *testing.T) { + collIDCh := make(chan UniqueID) + go node.BackGroundGC(collIDCh) + + dmChannelName := "fake-dm-channel-test-BackGroundGC" + + vchan := &datapb.VchannelInfo{ + CollectionID: 1, + ChannelName: dmChannelName, + CheckPoints: []*datapb.CheckPoint{}, + } + require.Equal(t, 0, len(node.vchan2FlushCh)) + require.Equal(t, 0, len(node.vchan2SyncService)) + + err := node.NewDataSyncService(vchan) + require.NoError(t, err) + time.Sleep(time.Second) + + require.Equal(t, 1, len(node.vchan2FlushCh)) + require.Equal(t, 1, len(node.vchan2SyncService)) + + collIDCh <- 1 + assert.Eventually(t, func() bool { + return len(node.vchan2FlushCh) == 0 + }, time.Second*4, time.Millisecond) + + assert.Equal(t, 0, len(node.vchan2SyncService)) + + s, ok := node.vchan2SyncService[dmChannelName] + assert.False(t, ok) + assert.Nil(t, s) + }) + <-node.ctx.Done() node.Stop() } diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 7dfa975390..b8599ba59f 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -27,6 +27,7 @@ import ( type dataSyncService struct { ctx context.Context + cancelFn context.CancelFunc fg *flowgraph.TimeTickedFlowGraph flushChan <-chan *flushMsg replica Replica @@ -34,6 +35,7 @@ type dataSyncService struct { msFactory msgstream.Factory collectionID UniqueID dataService types.DataService + clearSignal chan<- UniqueID } func newDataSyncService(ctx context.Context, @@ -41,10 +43,16 @@ func newDataSyncService(ctx context.Context, replica Replica, alloc allocatorInterface, factory msgstream.Factory, - vchan *datapb.VchannelInfo) *dataSyncService { + vchan *datapb.VchannelInfo, + clearSignal chan<- UniqueID, + +) *dataSyncService { + + ctx1, cancel := context.WithCancel(ctx) service := &dataSyncService{ - ctx: ctx, + ctx: ctx1, + cancelFn: cancel, fg: nil, flushChan: flushChan, replica: replica, @@ -58,8 +66,8 @@ func newDataSyncService(ctx context.Context, } func (dsService *dataSyncService) start() { - log.Debug("Data Sync Service Start Successfully") if dsService.fg != nil { + log.Debug("Data Sync Service starting flowgraph") dsService.fg.Start() } else { log.Debug("Data Sync Service flowgraph nil") @@ -68,11 +76,14 @@ func (dsService *dataSyncService) start() { func (dsService *dataSyncService) close() { if dsService.fg != nil { + log.Debug("Data Sync Service closing flowgraph") dsService.fg.Close() } + + dsService.cancelFn() } -func (dsService *dataSyncService) initNodes(vchanPair *datapb.VchannelInfo) { +func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) { // TODO: add delete pipeline support dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx) @@ -124,8 +135,9 @@ func (dsService *dataSyncService) initNodes(vchanPair *datapb.VchannelInfo) { } return nil } - var dmStreamNode Node = newDmInputNode(dsService.ctx, dsService.msFactory, vchanPair.GetChannelName(), vchanPair.GetCheckPoints()) - var ddNode Node = newDDNode() + + var dmStreamNode Node = newDmInputNode(dsService.ctx, dsService.msFactory, vchanInfo.GetChannelName(), vchanInfo.GetCheckPoints()) + var ddNode Node = newDDNode(dsService.clearSignal, dsService.collectionID) var insertBufferNode Node = newInsertBufferNode( dsService.ctx, dsService.replica, diff --git a/internal/datanode/data_sync_service_test.go b/internal/datanode/data_sync_service_test.go index 784751d5e2..5b22120184 100644 --- a/internal/datanode/data_sync_service_test.go +++ b/internal/datanode/data_sync_service_test.go @@ -61,7 +61,8 @@ func TestDataSyncService_Start(t *testing.T) { FlushedSegments: []int64{}, } - sync := newDataSyncService(ctx, flushChan, replica, allocFactory, msFactory, vchan) + signalCh := make(chan UniqueID, 100) + sync := newDataSyncService(ctx, flushChan, replica, allocFactory, msFactory, vchan, signalCh) sync.replica.addCollection(collMeta.ID, collMeta.Schema) go sync.start() diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index b0030280d6..1f48ccfb3d 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -23,6 +23,9 @@ import ( type ddNode struct { BaseNode + + clearSignal chan<- UniqueID + collectionID UniqueID } func (ddn *ddNode) Name() string { @@ -62,8 +65,10 @@ func (ddn *ddNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { for _, msg := range msMsg.TsMessages() { switch msg.Type() { case commonpb.MsgType_DropCollection: - // TODO distroy dataSyncService and nodify datanode - log.Error("Distorying current flowgraph") + if msg.(*msgstream.DropCollectionMsg).GetCollectionID() == ddn.collectionID { + ddn.clearSignal <- ddn.collectionID + log.Info("Destroying current flowgraph") + } case commonpb.MsgType_Insert: resMsg := ddn.filterFlushedSegmentInsertMessages(msg.(*msgstream.InsertMsg)) if resMsg != nil { @@ -85,11 +90,13 @@ func (ddn *ddNode) filterFlushedSegmentInsertMessages(msg *msgstream.InsertMsg) return msg } -func newDDNode() *ddNode { +func newDDNode(clearSignal chan<- UniqueID, collID UniqueID) *ddNode { baseNode := BaseNode{} baseNode.SetMaxParallelism(Params.FlowGraphMaxQueueLength) return &ddNode{ - BaseNode: baseNode, + BaseNode: baseNode, + clearSignal: clearSignal, + collectionID: collID, } } diff --git a/internal/datanode/mock_test.go b/internal/datanode/mock_test.go index 674a8b844c..cd8babb1ea 100644 --- a/internal/datanode/mock_test.go +++ b/internal/datanode/mock_test.go @@ -74,7 +74,7 @@ func newIDLEDataNodeMock() *DataNode { return node } -func newHEALTHDataNodeMock(dmChannelName, ddChannelName string) *DataNode { +func newHEALTHDataNodeMock(dmChannelName string) *DataNode { var ctx context.Context if debug {