mirror of https://github.com/milvus-io/milvus.git
data node, save binlog to minIO, and let data service save these meta (#5618)
Signed-off-by: yefu.chen <yefu.chen@zilliz.com>pull/5779/head
parent
91ef35bad4
commit
3c70675313
|
@ -13,10 +13,13 @@ package datanode
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/msgstream"
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/types"
|
||||
"github.com/milvus-io/milvus/internal/util/flowgraph"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
@ -30,6 +33,7 @@ type dataSyncService struct {
|
|||
idAllocator allocatorInterface
|
||||
msFactory msgstream.Factory
|
||||
collectionID UniqueID
|
||||
dataService types.DataService
|
||||
}
|
||||
|
||||
func newDataSyncService(ctx context.Context,
|
||||
|
@ -83,6 +87,43 @@ func (dsService *dataSyncService) initNodes(vchanPair *datapb.VchannelInfo) {
|
|||
panic(err)
|
||||
}
|
||||
|
||||
saveBinlog := func(fu *autoFlushUnit) error {
|
||||
id2path := []*datapb.ID2PathList{}
|
||||
checkPoints := []*datapb.CheckPoint{}
|
||||
for k, v := range fu.field2Path {
|
||||
id2path = append(id2path, &datapb.ID2PathList{ID: k, Paths: []string{v}})
|
||||
}
|
||||
for k, v := range fu.openSegCheckpoints {
|
||||
v := v
|
||||
checkPoints = append(checkPoints, &datapb.CheckPoint{
|
||||
SegmentID: k,
|
||||
NumOfRows: fu.numRows[k],
|
||||
Position: &v,
|
||||
})
|
||||
}
|
||||
|
||||
req := &datapb.SaveBinlogPathsRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: 0, //TOD msg type
|
||||
MsgID: 0, //TODO,msg id
|
||||
Timestamp: 0, //TODO, time stamp
|
||||
SourceID: Params.NodeID,
|
||||
},
|
||||
SegmentID: fu.segID,
|
||||
CollectionID: 0, //TODO
|
||||
Field2BinlogPaths: id2path,
|
||||
CheckPoints: checkPoints,
|
||||
Flushed: fu.flushed,
|
||||
}
|
||||
rsp, err := dsService.dataService.SaveBinlogPaths(dsService.ctx, req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("data service save bin log path failed, err = %w", err)
|
||||
}
|
||||
if rsp.ErrorCode != commonpb.ErrorCode_Success {
|
||||
return fmt.Errorf("data service save bin log path failed, reason = %s", rsp.Reason)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
var dmStreamNode Node = newDmInputNode(dsService.ctx, dsService.msFactory, vchanPair.GetChannelName(), vchanPair.GetCheckPoints())
|
||||
var ddNode Node = newDDNode()
|
||||
var insertBufferNode Node = newInsertBufferNode(
|
||||
|
@ -91,7 +132,7 @@ func (dsService *dataSyncService) initNodes(vchanPair *datapb.VchannelInfo) {
|
|||
dsService.msFactory,
|
||||
dsService.idAllocator,
|
||||
dsService.flushChan,
|
||||
nil, //TODO,=================== call data service save binlog =========
|
||||
saveBinlog,
|
||||
)
|
||||
|
||||
dsService.fg.AddNode(dmStreamNode)
|
||||
|
|
|
@ -64,10 +64,11 @@ type insertBufferNode struct {
|
|||
}
|
||||
|
||||
type autoFlushUnit struct {
|
||||
collID UniqueID
|
||||
segID UniqueID
|
||||
numRows int64
|
||||
field2Path map[UniqueID]string
|
||||
openSegCheckpoints map[UniqueID]internalpb.MsgPosition
|
||||
numRows map[UniqueID]int64
|
||||
flushed bool
|
||||
}
|
||||
|
||||
|
@ -503,15 +504,23 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
log.Debug("segment is empty")
|
||||
continue
|
||||
}
|
||||
segSta, err := ibNode.replica.getSegmentStatisticsUpdates(fu.segID)
|
||||
if err != nil {
|
||||
log.Debug("getSegmentStatisticsUpdates failed", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
fu.numRows = segSta.NumRows
|
||||
fu.openSegCheckpoints = ibNode.replica.listOpenSegmentCheckPoint()
|
||||
fu.numRows = make(map[UniqueID]int64)
|
||||
for k := range fu.openSegCheckpoints {
|
||||
segStat, err := ibNode.replica.getSegmentStatisticsUpdates(k)
|
||||
if err != nil {
|
||||
log.Debug("getSegmentStatisticsUpdates failed", zap.Error(err))
|
||||
fu.numRows = nil
|
||||
break
|
||||
}
|
||||
fu.numRows[k] = segStat.NumRows
|
||||
}
|
||||
if fu.numRows == nil {
|
||||
log.Debug("failed on get segment num rows")
|
||||
break
|
||||
}
|
||||
fu.flushed = false
|
||||
if ibNode.dsSaveBinlog(&fu) != nil {
|
||||
if err := ibNode.dsSaveBinlog(&fu); err != nil {
|
||||
log.Debug("data service save bin log path failed", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
@ -523,19 +532,30 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
currentSegID := fmsg.segmentID
|
||||
log.Debug(". Receiving flush message", zap.Int64("segmentID", currentSegID))
|
||||
|
||||
segSta, err := ibNode.replica.getSegmentStatisticsUpdates(currentSegID)
|
||||
if err != nil {
|
||||
log.Debug("getSegmentStatisticsUpdates failed", zap.Error(err))
|
||||
checkPoints := ibNode.replica.listOpenSegmentCheckPoint()
|
||||
numRows := make(map[UniqueID]int64)
|
||||
for k := range checkPoints {
|
||||
segStat, err := ibNode.replica.getSegmentStatisticsUpdates(k)
|
||||
if err != nil {
|
||||
log.Debug("getSegmentStatisticsUpdates failed", zap.Error(err))
|
||||
numRows = nil
|
||||
break
|
||||
}
|
||||
numRows[k] = segStat.NumRows
|
||||
}
|
||||
if numRows == nil {
|
||||
log.Debug("failed on get segment num rows")
|
||||
break
|
||||
}
|
||||
|
||||
if ibNode.insertBuffer.size(currentSegID) <= 0 {
|
||||
log.Debug(".. Buffer empty ...")
|
||||
ibNode.dsSaveBinlog(&autoFlushUnit{
|
||||
collID: fmsg.collectionID,
|
||||
segID: currentSegID,
|
||||
numRows: segSta.NumRows,
|
||||
field2Path: nil,
|
||||
openSegCheckpoints: ibNode.replica.listOpenSegmentCheckPoint(),
|
||||
numRows: numRows,
|
||||
field2Path: map[UniqueID]string{},
|
||||
openSegCheckpoints: checkPoints,
|
||||
flushed: true,
|
||||
})
|
||||
fmsg.dmlFlushedCh <- []*datapb.ID2PathList{{ID: currentSegID, Paths: []string{}}}
|
||||
|
@ -581,8 +601,8 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
fu := <-finishCh
|
||||
close(finishCh)
|
||||
if fu.field2Path != nil {
|
||||
fu.numRows = segSta.NumRows
|
||||
fu.openSegCheckpoints = ibNode.replica.listOpenSegmentCheckPoint()
|
||||
fu.numRows = numRows
|
||||
fu.openSegCheckpoints = checkPoints
|
||||
fu.flushed = true
|
||||
if ibNode.dsSaveBinlog(&fu) != nil {
|
||||
log.Debug("data service save bin log path failed", zap.Error(err))
|
||||
|
@ -703,7 +723,7 @@ func flushSegment(collMeta *etcdpb.CollectionMeta, segID, partitionID, collID Un
|
|||
}
|
||||
|
||||
replica.setSegmentCheckPoint(segID)
|
||||
flushUnit <- autoFlushUnit{segID: segID, field2Path: field2Path}
|
||||
flushUnit <- autoFlushUnit{collID: collID, segID: segID, field2Path: field2Path}
|
||||
clearFn(true)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue