mirror of https://github.com/milvus-io/milvus.git
Fix DeltaLogs empty yet appending a deltaInfo bug (#11984)
- Add more logs for startpositions Fixes: #11970 Signed-off-by: yangxuan <xuan.yang@zilliz.com>pull/12006/head
parent
cc0d6dc6fd
commit
7dbb4d10e5
|
@ -83,7 +83,7 @@ func printSegmentInfo(info *datapb.SegmentInfo) {
|
||||||
}
|
}
|
||||||
if info.DmlPosition != nil {
|
if info.DmlPosition != nil {
|
||||||
dmlTime, _ := tsoutil.ParseTS(info.DmlPosition.Timestamp)
|
dmlTime, _ := tsoutil.ParseTS(info.DmlPosition.Timestamp)
|
||||||
fmt.Printf("Dml Position ID: %v, time: %s\n", info.StartPosition.MsgID, dmlTime.Format(tsPrintFormat))
|
fmt.Printf("Dml Position ID: %v, time: %s\n", info.GetStartPosition().GetMsgID(), dmlTime.Format(tsPrintFormat))
|
||||||
} else {
|
} else {
|
||||||
fmt.Println("Dml Position: nil")
|
fmt.Println("Dml Position: nil")
|
||||||
}
|
}
|
||||||
|
|
|
@ -301,6 +301,7 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
|
||||||
zap.Int64("segmentID", req.GetSegmentID()),
|
zap.Int64("segmentID", req.GetSegmentID()),
|
||||||
zap.Bool("isFlush", req.GetFlushed()),
|
zap.Bool("isFlush", req.GetFlushed()),
|
||||||
zap.Bool("isDropped", req.GetDropped()),
|
zap.Bool("isDropped", req.GetDropped()),
|
||||||
|
zap.Any("startPositions", req.GetStartPositions()),
|
||||||
zap.Any("checkpoints", req.GetCheckPoints()))
|
zap.Any("checkpoints", req.GetCheckPoints()))
|
||||||
|
|
||||||
// validate
|
// validate
|
||||||
|
|
|
@ -403,9 +403,14 @@ func (t *compactionTask) compact() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
cpaths.deltaInfo.DeltaLogSize = deltaBuf.size
|
var deltaLogs []*datapb.DeltaLogInfo
|
||||||
cpaths.deltaInfo.TimestampFrom = deltaBuf.tsFrom
|
if len(cpaths.deltaInfo.GetDeltaLogPath()) > 0 {
|
||||||
cpaths.deltaInfo.TimestampTo = deltaBuf.tsTo
|
cpaths.deltaInfo.DeltaLogSize = deltaBuf.size
|
||||||
|
cpaths.deltaInfo.TimestampFrom = deltaBuf.tsFrom
|
||||||
|
cpaths.deltaInfo.TimestampTo = deltaBuf.tsTo
|
||||||
|
|
||||||
|
deltaLogs = append(deltaLogs, cpaths.deltaInfo)
|
||||||
|
}
|
||||||
|
|
||||||
pack := &datapb.CompactionResult{
|
pack := &datapb.CompactionResult{
|
||||||
PlanID: t.plan.GetPlanID(),
|
PlanID: t.plan.GetPlanID(),
|
||||||
|
@ -413,8 +418,7 @@ func (t *compactionTask) compact() error {
|
||||||
InsertLogs: cpaths.inPaths,
|
InsertLogs: cpaths.inPaths,
|
||||||
Field2StatslogPaths: cpaths.statsPaths,
|
Field2StatslogPaths: cpaths.statsPaths,
|
||||||
NumOfRows: numRows,
|
NumOfRows: numRows,
|
||||||
|
Deltalogs: deltaLogs,
|
||||||
Deltalogs: []*datapb.DeltaLogInfo{cpaths.deltaInfo},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
status, err := t.dc.CompleteCompaction(ctxTimeout, pack)
|
status, err := t.dc.CompleteCompaction(ctxTimeout, pack)
|
||||||
|
@ -446,7 +450,11 @@ func (t *compactionTask) compact() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
ti.injectOver <- true
|
ti.injectOver <- true
|
||||||
log.Info("compaction done", zap.Int64("planID", t.plan.GetPlanID()))
|
log.Info("compaction done", zap.Int64("planID", t.plan.GetPlanID()),
|
||||||
|
zap.Any("num of binlog paths", len(cpaths.inPaths)),
|
||||||
|
zap.Any("num of stats paths", len(cpaths.statsPaths)),
|
||||||
|
zap.Any("deltalog paths", cpaths.deltaInfo.GetDeltaLogPath()),
|
||||||
|
)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -454,9 +454,12 @@ func flushNotifyFunc(dsService *dataSyncService, opts ...retry.Option) notifyMet
|
||||||
Position: pack.pos,
|
Position: pack.pos,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
startPos := dsService.replica.listNewSegmentsStartPositions()
|
||||||
|
|
||||||
log.Debug("SaveBinlogPath",
|
log.Debug("SaveBinlogPath",
|
||||||
zap.Int64("SegmentID", pack.segmentID),
|
zap.Int64("SegmentID", pack.segmentID),
|
||||||
zap.Int64("CollectionID", dsService.collectionID),
|
zap.Int64("CollectionID", dsService.collectionID),
|
||||||
|
zap.Any("startPos", startPos),
|
||||||
zap.Int("Length of Field2BinlogPaths", len(fieldInsert)),
|
zap.Int("Length of Field2BinlogPaths", len(fieldInsert)),
|
||||||
zap.Int("Length of Field2Stats", len(fieldStats)),
|
zap.Int("Length of Field2Stats", len(fieldStats)),
|
||||||
zap.Int("Length of Field2Deltalogs", len(deltaInfos)),
|
zap.Int("Length of Field2Deltalogs", len(deltaInfos)),
|
||||||
|
@ -477,7 +480,7 @@ func flushNotifyFunc(dsService *dataSyncService, opts ...retry.Option) notifyMet
|
||||||
|
|
||||||
CheckPoints: checkPoints,
|
CheckPoints: checkPoints,
|
||||||
|
|
||||||
StartPositions: dsService.replica.listNewSegmentsStartPositions(),
|
StartPositions: startPos,
|
||||||
Flushed: pack.flushed,
|
Flushed: pack.flushed,
|
||||||
Dropped: pack.dropped,
|
Dropped: pack.dropped,
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue