Add docs in package datanode (#5117)

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/5145/head
XuanYang-cn 2021-05-08 15:24:12 +08:00 committed by GitHub
parent 5f0006d0f9
commit eb557b289b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 77 additions and 21 deletions

View File

@ -20,7 +20,7 @@ spec:
ephemeral-storage: "20Gi"
requests:
cpu: "4"
memory: 8Gi
memory: 12Gi
ephemeral-storage: "20Gi"
volumeMounts:
- mountPath: /docker-graph

3
go.mod
View File

@ -6,6 +6,9 @@ require (
github.com/HdrHistogram/hdrhistogram-go v1.0.1 // indirect
github.com/antonmedv/expr v1.8.9
github.com/apache/pulsar-client-go v0.4.0
github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6 // indirect
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b // indirect
github.com/datadog/zstd v1.4.6-0.20200617134701-89f69fb7df32 // indirect
github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c // indirect
github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 // indirect
github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 // indirect

View File

@ -20,9 +20,10 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
)
// ddl binlog meta key:
// binlogMeta persists binlog paths into etcd.
// ddl binlog etcd meta key:
// ${prefix}/${collectionID}/${idx}
// segment binlog meta key:
// segment binlog etcd meta key:
// ${prefix}/${segmentID}/${fieldID}/${idx}
type binlogMeta struct {
client kv.TxnKV // etcd kv
@ -37,8 +38,9 @@ func NewBinlogMeta(kv kv.TxnKV, idAllocator allocatorInterface) (*binlogMeta, er
return mt, nil
}
// if alloc is true, the returned keys will have a generated-unique ID at the end.
// if alloc is false, the returned keys will only consist of provided ids.
// genKey gives a valid key string for lists of UniqueIDs:
// if alloc is true, the returned keys will have a generated-unique ID at the end.
// if alloc is false, the returned keys will only consist of provided ids.
func (bm *binlogMeta) genKey(alloc bool, ids ...UniqueID) (key string, err error) {
if alloc {
idx, err := bm.idAllocator.allocID()
@ -57,22 +59,25 @@ func (bm *binlogMeta) genKey(alloc bool, ids ...UniqueID) (key string, err error
return
}
// SaveSegmentBinlogMetaTxn stores all fields' binlog paths of a segment in a transaction.
// segment binlog etcd meta key:
// ${prefix}/${segmentID}/${fieldID}/${idx}
func (bm *binlogMeta) SaveSegmentBinlogMetaTxn(segmentID UniqueID, field2Path map[UniqueID]string) error {
kvs := make(map[string]string, len(field2Path))
etcdKey2binlogPath := make(map[string]string, len(field2Path))
for fieldID, p := range field2Path {
key, err := bm.genKey(true, segmentID, fieldID)
if err != nil {
return err
}
v := proto.MarshalTextString(&datapb.SegmentFieldBinlogMeta{
binlogPath := proto.MarshalTextString(&datapb.SegmentFieldBinlogMeta{
FieldID: fieldID,
BinlogPath: p,
})
kvs[path.Join(Params.SegFlushMetaSubPath, key)] = v
etcdKey2binlogPath[path.Join(Params.SegFlushMetaSubPath, key)] = binlogPath
}
return bm.client.MultiSave(kvs)
return bm.client.MultiSave(etcdKey2binlogPath)
}
func (bm *binlogMeta) getFieldBinlogMeta(segmentID UniqueID,
@ -123,21 +128,21 @@ func (bm *binlogMeta) getSegmentBinlogMeta(segmentID UniqueID) (metas []*datapb.
return
}
// SaveDDLBinlogMetaTxn stores timestamp and ddl binlog path pair into etcd in a transaction.
// ddl binlog meta key:
// ${prefix}/${collectionID}/${idx}
// --- DDL ---
func (bm *binlogMeta) SaveDDLBinlogMetaTxn(collID UniqueID, tsPath string, ddlPath string) error {
k, err := bm.genKey(true, collID)
uniqueKey, err := bm.genKey(true, collID)
if err != nil {
return err
}
v := proto.MarshalTextString(&datapb.DDLBinlogMeta{
binlogPathPair := proto.MarshalTextString(&datapb.DDLBinlogMeta{
DdlBinlogPath: ddlPath,
TsBinlogPath: tsPath,
})
return bm.client.Save(path.Join(Params.DDLFlushMetaSubPath, k), v)
return bm.client.Save(path.Join(Params.DDLFlushMetaSubPath, uniqueKey), binlogPathPair)
}
func (bm *binlogMeta) getDDLBinlogMete(collID UniqueID) (metas []*datapb.DDLBinlogMeta, err error) {

View File

@ -17,6 +17,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/schemapb"
)
// Collection stuct is the data structure of collections in data node replica.
type Collection struct {
schema *schemapb.CollectionSchema
id UniqueID

View File

@ -44,6 +44,7 @@ type Replica interface {
getSegmentByID(segmentID UniqueID) (*Segment, error)
}
// Segment is the data structure of segments in data node replica.
type Segment struct {
segmentID UniqueID
collectionID UniqueID
@ -60,6 +61,8 @@ type Segment struct {
channelName string
}
// CollectionSegmentReplica is the data replication of persistent data in datanode.
// It implements `Replica` interface.
type CollectionSegmentReplica struct {
mu sync.RWMutex
segments map[UniqueID]*Segment
@ -89,6 +92,9 @@ func (replica *CollectionSegmentReplica) getSegmentByID(segmentID UniqueID) (*Se
}
// `addSegment` add a new segment into replica when data node see the segment
// for the first time in insert channels. It sets the startPosition of a segment, and
// flags `isNew=true`
func (replica *CollectionSegmentReplica) addSegment(
segmentID UniqueID,
collID UniqueID,
@ -179,6 +185,7 @@ func (replica *CollectionSegmentReplica) setEndPosition(segmentID UniqueID, endP
return fmt.Errorf("There's no segment %v", segmentID)
}
// `updateStatistics` updates the number of rows of a segment in replica.
func (replica *CollectionSegmentReplica) updateStatistics(segmentID UniqueID, numRows int64) error {
replica.mu.Lock()
defer replica.mu.Unlock()
@ -193,6 +200,9 @@ func (replica *CollectionSegmentReplica) updateStatistics(segmentID UniqueID, nu
return fmt.Errorf("There's no segment %v", segmentID)
}
// `getSegmentStatisticsUpdates` gives current segment's statistics updates.
// if the segment's flag `isNew` is true, updates will contain a valid start position.
// if the segment's flag `isFlushed` is true, updates will contain a valid end position.
func (replica *CollectionSegmentReplica) getSegmentStatisticsUpdates(segmentID UniqueID) (*internalpb.SegmentStatisticsUpdates, error) {
replica.mu.Lock()
defer replica.mu.Unlock()

View File

@ -9,6 +9,9 @@
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
// Package datanode implements data persistence logic.
//
// Data node persists definition language (ddl) strings and insert logs into persistent storage like minIO/S3.
package datanode
import (
@ -37,6 +40,22 @@ const (
RPCConnectionTimeout = 30 * time.Second
)
// DataNode struct communicates with outside services and unioun all
// services of data node.
//
// DataNode struct implements `types.Component`, `types.DataNode` interfaces.
// `dataSyncService` controls flowgraph in datanode.
// `metaService` initialize collections from master service when data node starts.
// `masterService` holds a grpc client of master service.
// `dataService` holds a grpc client of data service.
//
// `NodeID` is unique to each data node.
//
// `State` is current statement of this data node, indicating whether it's healthy.
//
// `flushChan` transfer flush messages from data service to flowgraph of data node.
//
// `replica` holds replications of persistent data, including collections and segments.
type DataNode struct {
ctx context.Context
cancel context.CancelFunc
@ -59,6 +78,7 @@ type DataNode struct {
msFactory msgstream.Factory
}
// NewDataNode will return a DataNode with abnormal state.
func NewDataNode(ctx context.Context, factory msgstream.Factory) *DataNode {
rand.Seed(time.Now().UnixNano())
ctx2, cancel2 := context.WithCancel(ctx)
@ -79,6 +99,7 @@ func NewDataNode(ctx context.Context, factory msgstream.Factory) *DataNode {
return node
}
// SetMasterServiceInterface sets master service's grpc client, error is returned if repeatedly set.
func (node *DataNode) SetMasterServiceInterface(ms types.MasterService) error {
switch {
case ms == nil, node.masterService != nil:
@ -89,6 +110,7 @@ func (node *DataNode) SetMasterServiceInterface(ms types.MasterService) error {
}
}
// SetDataServiceInterface sets data service's grpc client, error is returned if repeatedly set.
func (node *DataNode) SetDataServiceInterface(ds types.DataService) error {
switch {
case ds == nil, node.dataService != nil:
@ -99,7 +121,16 @@ func (node *DataNode) SetDataServiceInterface(ds types.DataService) error {
}
}
// Suppose dataservice is in INITIALIZING
// Init function supposes data service is in INITIALIZING state.
//
// In Init process, data node will register itself to data service with its node id
// and address. Therefore, `SetDataServiceInterface()` must be called before this func.
// Registering return several channel names data node need.
//
// After registering, data node will wait until data service calls `WatchDmChannels`
// for `RPCConnectionTimeout` ms.
//
// At last, data node initializes its `dataSyncService` and `metaService`.
func (node *DataNode) Init() error {
ctx := context.Background()
@ -121,13 +152,6 @@ func (node *DataNode) Init() error {
return fmt.Errorf("Receive error when registering data node, msg: %s", resp.Status.Reason)
}
select {
case <-time.After(RPCConnectionTimeout):
return errors.New("Get DmChannels failed in 30 seconds")
case <-node.watchDm:
log.Debug("insert channel names set")
}
for _, kv := range resp.InitParams.StartParams {
switch kv.Key {
case "DDChannelName":
@ -143,6 +167,13 @@ func (node *DataNode) Init() error {
}
}
select {
case <-time.After(RPCConnectionTimeout):
return errors.New("Get DmChannels failed in 30 seconds")
case <-node.watchDm:
log.Debug("insert channel names set")
}
replica := newReplica()
var alloc allocatorInterface = newAllocator(node.masterService)
@ -158,6 +189,7 @@ func (node *DataNode) Init() error {
return nil
}
// Start `metaService` and `dataSyncService` and update state to HEALTHY
func (node *DataNode) Start() error {
node.metaService.init()
go node.dataSyncService.start()
@ -169,6 +201,7 @@ func (node *DataNode) UpdateStateCode(code internalpb.StateCode) {
node.State.Store(code)
}
// WatchDmChannels set insert channel names data node subscribs to.
func (node *DataNode) WatchDmChannels(ctx context.Context, in *datapb.WatchDmChannelsRequest) (*commonpb.Status, error) {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
@ -205,6 +238,7 @@ func (node *DataNode) GetComponentStates(ctx context.Context) (*internalpb.Compo
return states, nil
}
// FlushSegments packs flush messages into flowgraph through flushChan.
func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmentsRequest) (*commonpb.Status, error) {
log.Debug("FlushSegments ...", zap.Int("num", len(req.SegmentIDs)))
ids := make([]UniqueID, 0)

View File

@ -26,6 +26,9 @@ import (
"github.com/milvus-io/milvus/internal/proto/milvuspb"
)
// metaService initialize replica collections in data node from master service.
// Initializing replica collections happens on data node starting. It depends on
// a healthy master service and a valid master service grpc client.
type metaService struct {
ctx context.Context
replica Replica