diff --git a/docs/design_docs/datanode_flowgraph_recovery_design_0604_2021.md b/docs/design_docs/datanode_flowgraph_recovery_design_0604_2021.md index 1016c90378..2bfbcefd75 100644 --- a/docs/design_docs/datanode_flowgraph_recovery_design_0604_2021.md +++ b/docs/design_docs/datanode_flowgraph_recovery_design_0604_2021.md @@ -1,6 +1,7 @@ # DataNode Flowgraph Recovery Design update: 6.4.2021, by [Goose](https://github.com/XuanYang-cn) +update: 6.21.2021, by [Goose](https://github.com/XuanYang-cn) ## 1. Common Sense A. 1 message stream to 1 vchannel, so there are 1 start position and 1 end position in 1 message pack @@ -11,7 +12,11 @@ An optimization: update position of C. DataNode auto-flush is a valid flush. D. DDL messages are now in DML Vchannels. -## 2. Flowgraph Recovery +## 2. Segments in Flowgraph + +![segments](graphs/segments.png) + +## 3. Flowgraph Recovery ### A. Save checkpoints When a flowgraph flushes a segment, we need to save these things: - current segment's binlog paths, @@ -27,7 +32,21 @@ Whether save successfully: ### B. Recovery from a set of checkpoints 1. We need all positions of all segments in this vchannel `p1, p2, ... pn` - [TBD] A design of WatchDmChannelReq +A design of WatchDmChannelReq +``` proto +message VchannelInfo { + int64 collectionID = 1; + string channelName = 2; + internal.MsgPosition seek_position = 3; + repeated SegmentInfo unflushedSegments = 4; + repeated int64 flushedSegments = 5; +} + +message WatchDmChannelsRequest { + common.MsgBase base = 1; + repeated VchannelInfo vchannels = 2; +} +``` 2. We want to filter msgPacks based on these positions. diff --git a/docs/design_docs/datanode_recovery_design_0513_2021.md b/docs/design_docs/datanode_recovery_design_0513_2021.md index efee9205a5..2bae195473 100644 --- a/docs/design_docs/datanode_recovery_design_0513_2021.md +++ b/docs/design_docs/datanode_recovery_design_0513_2021.md @@ -2,6 +2,7 @@ update: 5.21.2021, by [Goose](https://github.com/XuanYang-cn) update: 6.03.2021, by [Goose](https://github.com/XuanYang-cn) +update: 6.21.2021, by [Goose](https://github.com/XuanYang-cn) ## What's DataNode? @@ -41,7 +42,7 @@ Vchannel is stateful because we don't want to process twice what's already proce already persistant. In DataNode's terminology, a message is processed if it's been flushed. DataService tells DataNode stateful vchannel infos through RPC `WatchDmChannels`, so that DataNode won't process -the same messages over and over again. So flowgraph needs ability to comsume messages in the middle of a vchannel. +the same messages over and over again. So flowgraph needs ability to consume messages in the middle of a vchannel. DataNode tells DataService vchannel states after each flush through RPC `SaveBinlogPaths`, so that DataService keep the vchannel states update. @@ -51,7 +52,7 @@ keep the vchannel states update. ### 1. DataNode no longer interacts with Etcd except service registering -#### **O1-1** DataService rather than DataNode saves binlog paths into Etcd +#### DataService rather than DataNode saves binlog paths into Etcd ![datanode_design](graphs/datanode_design_01.jpg) @@ -65,36 +66,23 @@ message ID2PathList { repeated string Paths = 2; } +message CheckPoint { + int64 segmentID = 1; + internal.MsgPosition position = 2; + int64 num_of_rows = 3; +} + message SaveBinlogPathsRequest { common.MsgBase base = 1; int64 segmentID = 2; int64 collectionID = 3; - ID2PathList field2BinlogPaths = 4; - ID2PathList coll2TsBinlogPaths = 5; - ID2PathList coll2DdlBinlogPaths = 6; - repeated internal.MsgPosition start_positions = 7; - repeated internal.MsgPosition end_positions = 8; + repeated ID2PathList field2BinlogPaths = 4; + repeated CheckPoint checkPoints = 7; + repeated SegmentStartPosition start_positions = 6; + bool flushed = 7; } ``` -##### DataService Etcd Binlog Meta Design - -The same as DataNode - -```proto -// key: ${prefix}/${segmentID}/${fieldID}/${idx} -message SegmentFieldBinlogMeta { - int64 fieldID = 1; - string binlog_path = 2; -} - -// key: ${prefix}/${collectionID}/${idx} -message DDLBinlogMeta { - string ddl_binlog_path = 1; - string ts_binlog_path = 2; -} -``` - ### 4. DataNode with collection with flowgraph with vchannel designs #### The winner @@ -107,22 +95,17 @@ message DDLBinlogMeta { Change `WatchDmChannelsRequest` proto. ``` proto -message PositionPair { - internal.MsgPosition start_position = 1; - internal.MsgPosition end_position = 2; -} - -message VchannelPair { +message VchannelInfo { int64 collectionID = 1; - string dml_vchannel_name = 2; - string ddl_vchannel_name = 3; - PositionPair ddl_position = 4; - PositionPair dml_position = 5; + string channelName = 2; + internal.MsgPosition seek_position = 3; + repeated SegmentInfo unflushedSegments = 4; + repeated int64 flushedSegments = 5; } message WatchDmChannelsRequest { common.MsgBase base = 1; - repeated VchannelPair vchannels = 2; + repeated VchannelInfo vchannels = 2; } ``` @@ -134,8 +117,9 @@ type DataNode struct { ... vchan2Sync map[string]*dataSyncService vchan2FlushCh map[string]chan<- *flushMsg + + clearSignal chan UniqueID ... - replica Replica // TODO remove } // DataSyncService @@ -147,7 +131,6 @@ type dataSyncService struct { idAllocator allocatorInterface msFactory msgstream.Factory collectionID UniqueID - segmentIDs []UniqueID // getSegmentIDs() of Replica } ``` @@ -155,24 +138,12 @@ DataNode Init -> Resigter to Etcd -> Discovery data service -> Discover master s WatchDmChannels -> new dataSyncService -> HEALTH -```proto -message WatchDmChannelsRequest { - common.MsgBase base = 1; - repeated VchannelPair vchannels = 2; -} -``` `WatchDmChannels:` 1. If `DataNode.vchan2Sync` is empty, DataNode is in IDLE, `WatchDmChannels` will create new dataSyncService for every unique vchannel, then DataNode is in HEALTH. 2. If vchannel name of `VchannelPair` is not in `DataNode.vchan2Sync`, create a new dataSyncService. 3. If vchannel name of `VchannelPair` is in `DataNode.vchan2Sync`, ignore. -`newDataSyncService:` - -```go -func newDataSyncService(ctx context.Context, flushChan <-chan *flushMsg, replica Replica, - alloc allocatorInterface, factory msgstream.Factory, vchanPair *datapb.VchannelPair) *dataSyncService - ``` #### The boring design diff --git a/docs/design_docs/graphs/segments.png b/docs/design_docs/graphs/segments.png new file mode 100644 index 0000000000..e6c5c989b3 Binary files /dev/null and b/docs/design_docs/graphs/segments.png differ diff --git a/docs/developer_guides/chap09_data_service.md b/docs/developer_guides/chap09_data_service.md index c1d6523947..77641224a3 100644 --- a/docs/developer_guides/chap09_data_service.md +++ b/docs/developer_guides/chap09_data_service.md @@ -286,7 +286,7 @@ type DataNode interface { ```go type WatchDmChannelRequest struct { Base *commonpb.MsgBase - ChannelNames []string + Vchannels []*VchannelInfo } ``` @@ -326,5 +326,17 @@ type SegmentStatisticsMsg struct { BaseMsg SegmentStatistics } + +``` +#### 8.3 DataNode Time Tick Channel + +* *DataNode Tt Msg* + +```go +message DataNodeTtMsg { + Base *commonpb.MsgBase + ChannelName string + Timestamp uint64 +} ```