mirror of https://github.com/milvus-io/milvus.git
Save stats and delta logs from DataNode (#10159)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/10168/head
parent
6a886d63f2
commit
ed4e516ce7
|
@ -159,6 +159,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
|
|||
dsService.flushManager = NewRendezvousFlushManager(dsService.idAllocator, minIOKV, dsService.replica, func(pack *segmentFlushPack) error {
|
||||
fieldInsert := []*datapb.FieldBinlog{}
|
||||
fieldStats := []*datapb.FieldBinlog{}
|
||||
deltaInfos := make([]*datapb.DeltaLogInfo, len(pack.deltaLogs))
|
||||
checkPoints := []*datapb.CheckPoint{}
|
||||
for k, v := range pack.insertLogs {
|
||||
fieldInsert = append(fieldInsert, &datapb.FieldBinlog{FieldID: k, Binlogs: []string{v}})
|
||||
|
@ -166,6 +167,10 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
|
|||
for k, v := range pack.statsLogs {
|
||||
fieldStats = append(fieldStats, &datapb.FieldBinlog{FieldID: k, Binlogs: []string{v}})
|
||||
}
|
||||
for _, delData := range pack.deltaLogs {
|
||||
deltaInfos = append(deltaInfos, &datapb.DeltaLogInfo{RecordEntries: uint64(delData.size), TimestampFrom: delData.tsFrom, TimestampTo: delData.tsTo, DeltaLogSize: delData.fileSize})
|
||||
}
|
||||
|
||||
// only current segment checkpoint info,
|
||||
updates, _ := dsService.replica.getSegmentStatisticsUpdates(pack.segmentID)
|
||||
checkPoints = append(checkPoints, &datapb.CheckPoint{
|
||||
|
@ -187,10 +192,12 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
|
|||
Timestamp: 0, //TODO time stamp
|
||||
SourceID: Params.NodeID,
|
||||
},
|
||||
SegmentID: pack.segmentID,
|
||||
CollectionID: dsService.collectionID,
|
||||
Field2BinlogPaths: fieldInsert,
|
||||
//TODO WIP add statslog and deltalog
|
||||
SegmentID: pack.segmentID,
|
||||
CollectionID: dsService.collectionID,
|
||||
Field2BinlogPaths: fieldInsert,
|
||||
Field2StatslogPaths: fieldStats,
|
||||
Deltalogs: deltaInfos,
|
||||
|
||||
CheckPoints: checkPoints,
|
||||
|
||||
StartPositions: dsService.replica.listNewSegmentsStartPositions(),
|
||||
|
|
|
@ -50,10 +50,11 @@ type deleteNode struct {
|
|||
// DelDataBuf buffers insert data, monitoring buffer size and limit
|
||||
// size and limit both indicate numOfRows
|
||||
type DelDataBuf struct {
|
||||
delData *DeleteData
|
||||
size int64
|
||||
tsFrom Timestamp
|
||||
tsTo Timestamp
|
||||
delData *DeleteData
|
||||
size int64
|
||||
tsFrom Timestamp
|
||||
tsTo Timestamp
|
||||
fileSize int64
|
||||
}
|
||||
|
||||
func (ddb *DelDataBuf) updateSize(size int64) {
|
||||
|
|
|
@ -244,6 +244,7 @@ func (m *rendezvousFlushManager) flushDelData(data *DelDataBuf, segmentID Unique
|
|||
blobKey, _ := m.genKey(false, collID, partID, segmentID, logID)
|
||||
blobPath := path.Join(Params.DeleteBinlogRootPath, blobKey)
|
||||
kvs := map[string]string{blobPath: string(blob.Value[:])}
|
||||
data.fileSize = int64(len(blob.Value))
|
||||
log.Debug("delete blob path", zap.String("path", blobPath))
|
||||
|
||||
m.getFlushQueue(segmentID).enqueueDelFlush(&flushBufferDeleteTask{
|
||||
|
|
|
@ -88,7 +88,11 @@ func (t *flushTaskRunner) runFlushInsert(task flushInsertTask, binlogs, statslog
|
|||
// runFlushDel execute flush delete task with once and retry
|
||||
func (t *flushTaskRunner) runFlushDel(task flushDeleteTask, deltaLogs *DelDataBuf) {
|
||||
t.deleteOnce.Do(func() {
|
||||
t.deltaLogs = []*DelDataBuf{deltaLogs}
|
||||
if deltaLogs == nil {
|
||||
t.deltaLogs = []*DelDataBuf{}
|
||||
} else {
|
||||
t.deltaLogs = []*DelDataBuf{deltaLogs}
|
||||
}
|
||||
go func() {
|
||||
err := errStart
|
||||
for err != nil {
|
||||
|
|
Loading…
Reference in New Issue