mirror of https://github.com/milvus-io/milvus.git
parent
7f2d7c1da3
commit
5a178fec04
|
@ -1,6 +1,7 @@
|
|||
# DataNode Flowgraph Recovery Design
|
||||
|
||||
update: 6.4.2021, by [Goose](https://github.com/XuanYang-cn)
|
||||
update: 6.21.2021, by [Goose](https://github.com/XuanYang-cn)
|
||||
|
||||
## 1. Common Sense
|
||||
A. 1 message stream to 1 vchannel, so there are 1 start position and 1 end position in 1 message pack
|
||||
|
@ -11,7 +12,11 @@ An optimization: update position of
|
|||
C. DataNode auto-flush is a valid flush.
|
||||
D. DDL messages are now in DML Vchannels.
|
||||
|
||||
## 2. Flowgraph Recovery
|
||||
## 2. Segments in Flowgraph
|
||||
|
||||
![segments](graphs/segments.png)
|
||||
|
||||
## 3. Flowgraph Recovery
|
||||
### A. Save checkpoints
|
||||
When a flowgraph flushes a segment, we need to save these things:
|
||||
- current segment's binlog paths,
|
||||
|
@ -27,7 +32,21 @@ Whether save successfully:
|
|||
### B. Recovery from a set of checkpoints
|
||||
1. We need all positions of all segments in this vchannel `p1, p2, ... pn`
|
||||
|
||||
[TBD] A design of WatchDmChannelReq
|
||||
A design of WatchDmChannelReq
|
||||
``` proto
|
||||
message VchannelInfo {
|
||||
int64 collectionID = 1;
|
||||
string channelName = 2;
|
||||
internal.MsgPosition seek_position = 3;
|
||||
repeated SegmentInfo unflushedSegments = 4;
|
||||
repeated int64 flushedSegments = 5;
|
||||
}
|
||||
|
||||
message WatchDmChannelsRequest {
|
||||
common.MsgBase base = 1;
|
||||
repeated VchannelInfo vchannels = 2;
|
||||
}
|
||||
```
|
||||
|
||||
2. We want to filter msgPacks based on these positions.
|
||||
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
|
||||
update: 5.21.2021, by [Goose](https://github.com/XuanYang-cn)
|
||||
update: 6.03.2021, by [Goose](https://github.com/XuanYang-cn)
|
||||
update: 6.21.2021, by [Goose](https://github.com/XuanYang-cn)
|
||||
|
||||
## What's DataNode?
|
||||
|
||||
|
@ -41,7 +42,7 @@ Vchannel is stateful because we don't want to process twice what's already proce
|
|||
already persistant. In DataNode's terminology, a message is processed if it's been flushed.
|
||||
|
||||
DataService tells DataNode stateful vchannel infos through RPC `WatchDmChannels`, so that DataNode won't process
|
||||
the same messages over and over again. So flowgraph needs ability to comsume messages in the middle of a vchannel.
|
||||
the same messages over and over again. So flowgraph needs ability to consume messages in the middle of a vchannel.
|
||||
|
||||
DataNode tells DataService vchannel states after each flush through RPC `SaveBinlogPaths`, so that DataService
|
||||
keep the vchannel states update.
|
||||
|
@ -51,7 +52,7 @@ keep the vchannel states update.
|
|||
|
||||
### 1. DataNode no longer interacts with Etcd except service registering
|
||||
|
||||
#### **O1-1** DataService rather than DataNode saves binlog paths into Etcd
|
||||
#### DataService rather than DataNode saves binlog paths into Etcd
|
||||
|
||||
![datanode_design](graphs/datanode_design_01.jpg)
|
||||
|
||||
|
@ -65,36 +66,23 @@ message ID2PathList {
|
|||
repeated string Paths = 2;
|
||||
}
|
||||
|
||||
message CheckPoint {
|
||||
int64 segmentID = 1;
|
||||
internal.MsgPosition position = 2;
|
||||
int64 num_of_rows = 3;
|
||||
}
|
||||
|
||||
message SaveBinlogPathsRequest {
|
||||
common.MsgBase base = 1;
|
||||
int64 segmentID = 2;
|
||||
int64 collectionID = 3;
|
||||
ID2PathList field2BinlogPaths = 4;
|
||||
ID2PathList coll2TsBinlogPaths = 5;
|
||||
ID2PathList coll2DdlBinlogPaths = 6;
|
||||
repeated internal.MsgPosition start_positions = 7;
|
||||
repeated internal.MsgPosition end_positions = 8;
|
||||
repeated ID2PathList field2BinlogPaths = 4;
|
||||
repeated CheckPoint checkPoints = 7;
|
||||
repeated SegmentStartPosition start_positions = 6;
|
||||
bool flushed = 7;
|
||||
}
|
||||
```
|
||||
|
||||
##### DataService Etcd Binlog Meta Design
|
||||
|
||||
The same as DataNode
|
||||
|
||||
```proto
|
||||
// key: ${prefix}/${segmentID}/${fieldID}/${idx}
|
||||
message SegmentFieldBinlogMeta {
|
||||
int64 fieldID = 1;
|
||||
string binlog_path = 2;
|
||||
}
|
||||
|
||||
// key: ${prefix}/${collectionID}/${idx}
|
||||
message DDLBinlogMeta {
|
||||
string ddl_binlog_path = 1;
|
||||
string ts_binlog_path = 2;
|
||||
}
|
||||
```
|
||||
|
||||
### 4. DataNode with collection with flowgraph with vchannel designs
|
||||
|
||||
#### The winner
|
||||
|
@ -107,22 +95,17 @@ message DDLBinlogMeta {
|
|||
Change `WatchDmChannelsRequest` proto.
|
||||
|
||||
``` proto
|
||||
message PositionPair {
|
||||
internal.MsgPosition start_position = 1;
|
||||
internal.MsgPosition end_position = 2;
|
||||
}
|
||||
|
||||
message VchannelPair {
|
||||
message VchannelInfo {
|
||||
int64 collectionID = 1;
|
||||
string dml_vchannel_name = 2;
|
||||
string ddl_vchannel_name = 3;
|
||||
PositionPair ddl_position = 4;
|
||||
PositionPair dml_position = 5;
|
||||
string channelName = 2;
|
||||
internal.MsgPosition seek_position = 3;
|
||||
repeated SegmentInfo unflushedSegments = 4;
|
||||
repeated int64 flushedSegments = 5;
|
||||
}
|
||||
|
||||
message WatchDmChannelsRequest {
|
||||
common.MsgBase base = 1;
|
||||
repeated VchannelPair vchannels = 2;
|
||||
repeated VchannelInfo vchannels = 2;
|
||||
}
|
||||
```
|
||||
|
||||
|
@ -134,8 +117,9 @@ type DataNode struct {
|
|||
...
|
||||
vchan2Sync map[string]*dataSyncService
|
||||
vchan2FlushCh map[string]chan<- *flushMsg
|
||||
|
||||
clearSignal chan UniqueID
|
||||
...
|
||||
replica Replica // TODO remove
|
||||
}
|
||||
|
||||
// DataSyncService
|
||||
|
@ -147,7 +131,6 @@ type dataSyncService struct {
|
|||
idAllocator allocatorInterface
|
||||
msFactory msgstream.Factory
|
||||
collectionID UniqueID
|
||||
segmentIDs []UniqueID // getSegmentIDs() of Replica
|
||||
}
|
||||
```
|
||||
|
||||
|
@ -155,24 +138,12 @@ DataNode Init -> Resigter to Etcd -> Discovery data service -> Discover master s
|
|||
|
||||
WatchDmChannels -> new dataSyncService -> HEALTH
|
||||
|
||||
```proto
|
||||
message WatchDmChannelsRequest {
|
||||
common.MsgBase base = 1;
|
||||
repeated VchannelPair vchannels = 2;
|
||||
}
|
||||
```
|
||||
`WatchDmChannels:`
|
||||
|
||||
1. If `DataNode.vchan2Sync` is empty, DataNode is in IDLE, `WatchDmChannels` will create new dataSyncService for every unique vchannel, then DataNode is in HEALTH.
|
||||
2. If vchannel name of `VchannelPair` is not in `DataNode.vchan2Sync`, create a new dataSyncService.
|
||||
3. If vchannel name of `VchannelPair` is in `DataNode.vchan2Sync`, ignore.
|
||||
|
||||
`newDataSyncService:`
|
||||
|
||||
```go
|
||||
func newDataSyncService(ctx context.Context, flushChan <-chan *flushMsg, replica Replica,
|
||||
alloc allocatorInterface, factory msgstream.Factory, vchanPair *datapb.VchannelPair) *dataSyncService
|
||||
|
||||
```
|
||||
|
||||
#### The boring design
|
||||
|
|
Binary file not shown.
After Width: | Height: | Size: 52 KiB |
|
@ -286,7 +286,7 @@ type DataNode interface {
|
|||
```go
|
||||
type WatchDmChannelRequest struct {
|
||||
Base *commonpb.MsgBase
|
||||
ChannelNames []string
|
||||
Vchannels []*VchannelInfo
|
||||
}
|
||||
```
|
||||
|
||||
|
@ -326,5 +326,17 @@ type SegmentStatisticsMsg struct {
|
|||
BaseMsg
|
||||
SegmentStatistics
|
||||
}
|
||||
|
||||
```
|
||||
#### 8.3 DataNode Time Tick Channel
|
||||
|
||||
* *DataNode Tt Msg*
|
||||
|
||||
```go
|
||||
message DataNodeTtMsg {
|
||||
Base *commonpb.MsgBase
|
||||
ChannelName string
|
||||
Timestamp uint64
|
||||
}
|
||||
```
|
||||
|
||||
|
|
Loading…
Reference in New Issue