fix: Fix bug where binlogs already flushed with new segment during pack (#34762)

issue: #34703

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
pull/34790/head
cai.zhang 2024-07-18 09:15:48 +08:00 committed by GitHub
parent 4939f82d4f
commit 0c0ca4cf0e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 33 additions and 20 deletions

View File

@ -108,9 +108,10 @@ type ClusterBuffer struct {
bufferMemorySize atomic.Int64 bufferMemorySize atomic.Int64
flushedRowNum atomic.Int64 flushedRowNum map[typeutil.UniqueID]atomic.Int64
currentSegmentRowNum atomic.Int64 currentSegmentRowNum atomic.Int64
flushedBinlogs map[typeutil.UniqueID]*datapb.FieldBinlog // segID -> fieldID -> binlogs
flushedBinlogs map[typeutil.UniqueID]map[typeutil.UniqueID]*datapb.FieldBinlog
uploadedSegments []*datapb.CompactionSegment uploadedSegments []*datapb.CompactionSegment
uploadedSegmentStats map[typeutil.UniqueID]storage.SegmentStats uploadedSegmentStats map[typeutil.UniqueID]storage.SegmentStats
@ -293,7 +294,8 @@ func (t *clusteringCompactionTask) getScalarAnalyzeResult(ctx context.Context) e
} }
buffer := &ClusterBuffer{ buffer := &ClusterBuffer{
id: id, id: id,
flushedBinlogs: make(map[typeutil.UniqueID]*datapb.FieldBinlog, 0), flushedRowNum: map[typeutil.UniqueID]atomic.Int64{},
flushedBinlogs: make(map[typeutil.UniqueID]map[typeutil.UniqueID]*datapb.FieldBinlog, 0),
uploadedSegments: make([]*datapb.CompactionSegment, 0), uploadedSegments: make([]*datapb.CompactionSegment, 0),
uploadedSegmentStats: make(map[typeutil.UniqueID]storage.SegmentStats, 0), uploadedSegmentStats: make(map[typeutil.UniqueID]storage.SegmentStats, 0),
clusteringKeyFieldStats: fieldStats, clusteringKeyFieldStats: fieldStats,
@ -346,7 +348,8 @@ func (t *clusteringCompactionTask) getVectorAnalyzeResult(ctx context.Context) e
fieldStats.SetVectorCentroids(storage.NewVectorFieldValue(t.clusteringKeyField.DataType, centroid)) fieldStats.SetVectorCentroids(storage.NewVectorFieldValue(t.clusteringKeyField.DataType, centroid))
clusterBuffer := &ClusterBuffer{ clusterBuffer := &ClusterBuffer{
id: id, id: id,
flushedBinlogs: make(map[typeutil.UniqueID]*datapb.FieldBinlog, 0), flushedRowNum: map[typeutil.UniqueID]atomic.Int64{},
flushedBinlogs: make(map[typeutil.UniqueID]map[typeutil.UniqueID]*datapb.FieldBinlog, 0),
uploadedSegments: make([]*datapb.CompactionSegment, 0), uploadedSegments: make([]*datapb.CompactionSegment, 0),
uploadedSegmentStats: make(map[typeutil.UniqueID]storage.SegmentStats, 0), uploadedSegmentStats: make(map[typeutil.UniqueID]storage.SegmentStats, 0),
clusteringKeyFieldStats: fieldStats, clusteringKeyFieldStats: fieldStats,
@ -587,16 +590,18 @@ func (t *clusteringCompactionTask) mappingSegment(
zap.Int64("currentBufferWrittenMemorySize", currentBufferWrittenMemorySize)) zap.Int64("currentBufferWrittenMemorySize", currentBufferWrittenMemorySize))
// trigger flushBinlog // trigger flushBinlog
currentBufferNum := clusterBuffer.writer.GetRowNum() currentSegmentNumRows := clusterBuffer.currentSegmentRowNum.Load()
if clusterBuffer.currentSegmentRowNum.Load()+currentBufferNum > t.plan.GetMaxSegmentRows() || if currentSegmentNumRows > t.plan.GetMaxSegmentRows() ||
clusterBuffer.writer.IsFull() { clusterBuffer.writer.IsFull() {
// reach segment/binlog max size // reach segment/binlog max size
t.clusterBufferLocks.Lock(clusterBuffer.id) t.clusterBufferLocks.Lock(clusterBuffer.id)
writer := clusterBuffer.writer writer := clusterBuffer.writer
pack, _ := t.refreshBufferWriter(clusterBuffer) pack, _ := t.refreshBufferWriter(clusterBuffer)
log.Debug("buffer need to flush", zap.Int("bufferID", clusterBuffer.id), log.Debug("buffer need to flush", zap.Int("bufferID", clusterBuffer.id),
zap.Bool("pack", pack), zap.Int64("buffer num", currentBufferNum), zap.Bool("pack", pack),
zap.Int64("clusterBuffer.flushedRowNum.Load()", clusterBuffer.flushedRowNum.Load())) zap.Int64("current segment", writer.GetSegmentID()),
zap.Int64("current segment num rows", currentSegmentNumRows),
zap.Int64("writer num", writer.GetRowNum()))
t.clusterBufferLocks.Unlock(clusterBuffer.id) t.clusterBufferLocks.Unlock(clusterBuffer.id)
t.flushChan <- FlushSignal{ t.flushChan <- FlushSignal{
@ -805,14 +810,17 @@ func (t *clusteringCompactionTask) flushAll(ctx context.Context) error {
} }
func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buffer *ClusterBuffer, writer *SegmentWriter) error { func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buffer *ClusterBuffer, writer *SegmentWriter) error {
if len(buffer.flushedBinlogs) == 0 { if binlogs, ok := buffer.flushedBinlogs[writer.GetSegmentID()]; !ok || len(binlogs) == 0 {
return nil return nil
} }
insertLogs := make([]*datapb.FieldBinlog, 0) insertLogs := make([]*datapb.FieldBinlog, 0)
for _, fieldBinlog := range buffer.flushedBinlogs { for _, fieldBinlog := range buffer.flushedBinlogs[writer.GetSegmentID()] {
insertLogs = append(insertLogs, fieldBinlog) insertLogs = append(insertLogs, fieldBinlog)
} }
statPaths, err := statSerializeWrite(ctx, t.binlogIO, t.logIDAlloc, writer, buffer.flushedRowNum.Load())
numRows := buffer.flushedRowNum[writer.GetSegmentID()]
statPaths, err := statSerializeWrite(ctx, t.binlogIO, t.logIDAlloc, writer, numRows.Load())
if err != nil { if err != nil {
return err return err
} }
@ -821,7 +829,7 @@ func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buff
seg := &datapb.CompactionSegment{ seg := &datapb.CompactionSegment{
PlanID: t.plan.GetPlanID(), PlanID: t.plan.GetPlanID(),
SegmentID: writer.GetSegmentID(), SegmentID: writer.GetSegmentID(),
NumOfRows: buffer.flushedRowNum.Load(), NumOfRows: numRows.Load(),
InsertLogs: insertLogs, InsertLogs: insertLogs,
Field2StatslogPaths: []*datapb.FieldBinlog{statPaths}, Field2StatslogPaths: []*datapb.FieldBinlog{statPaths},
Channel: t.plan.GetChannel(), Channel: t.plan.GetChannel(),
@ -829,11 +837,10 @@ func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buff
buffer.uploadedSegments = append(buffer.uploadedSegments, seg) buffer.uploadedSegments = append(buffer.uploadedSegments, seg)
segmentStats := storage.SegmentStats{ segmentStats := storage.SegmentStats{
FieldStats: []storage.FieldStats{buffer.clusteringKeyFieldStats.Clone()}, FieldStats: []storage.FieldStats{buffer.clusteringKeyFieldStats.Clone()},
NumRows: int(buffer.flushedRowNum.Load()), NumRows: int(numRows.Load()),
} }
buffer.uploadedSegmentStats[writer.GetSegmentID()] = segmentStats buffer.uploadedSegmentStats[writer.GetSegmentID()] = segmentStats
buffer.flushedBinlogs = make(map[typeutil.UniqueID]*datapb.FieldBinlog, 0)
for _, binlog := range seg.InsertLogs { for _, binlog := range seg.InsertLogs {
log.Debug("pack binlog in segment", zap.Int64("partitionID", t.partitionID), zap.Int64("segID", writer.GetSegmentID()), zap.String("binlog", binlog.String())) log.Debug("pack binlog in segment", zap.Int64("partitionID", t.partitionID), zap.Int64("segID", writer.GetSegmentID()), zap.String("binlog", binlog.String()))
} }
@ -841,9 +848,9 @@ func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buff
zap.Int64("segID", seg.GetSegmentID()), zap.Int64("segID", seg.GetSegmentID()),
zap.Int64("row num", seg.GetNumOfRows())) zap.Int64("row num", seg.GetNumOfRows()))
// reset // clear segment binlogs cache
buffer.flushedRowNum.Store(0) buffer.flushedBinlogs[writer.GetSegmentID()] = nil
// set old writer nil //set old writer nil
writer = nil writer = nil
return nil return nil
} }
@ -888,16 +895,22 @@ func (t *clusteringCompactionTask) flushBinlog(ctx context.Context, buffer *Clus
return err return err
} }
if info, ok := buffer.flushedBinlogs[writer.GetSegmentID()]; !ok || info == nil {
buffer.flushedBinlogs[writer.GetSegmentID()] = make(map[typeutil.UniqueID]*datapb.FieldBinlog)
}
for fID, path := range partialBinlogs { for fID, path := range partialBinlogs {
tmpBinlog, ok := buffer.flushedBinlogs[fID] tmpBinlog, ok := buffer.flushedBinlogs[writer.GetSegmentID()][fID]
if !ok { if !ok {
tmpBinlog = path tmpBinlog = path
} else { } else {
tmpBinlog.Binlogs = append(tmpBinlog.Binlogs, path.GetBinlogs()...) tmpBinlog.Binlogs = append(tmpBinlog.Binlogs, path.GetBinlogs()...)
} }
buffer.flushedBinlogs[fID] = tmpBinlog buffer.flushedBinlogs[writer.GetSegmentID()][fID] = tmpBinlog
} }
buffer.flushedRowNum.Add(writtenRowNum) curSegFlushedRowNum := buffer.flushedRowNum[writer.GetSegmentID()]
curSegFlushedRowNum.Add(writtenRowNum)
buffer.flushedRowNum[writer.GetSegmentID()] = curSegFlushedRowNum
// clean buffer with writer // clean buffer with writer
buffer.bufferMemorySize.Sub(writtenMemorySize) buffer.bufferMemorySize.Sub(writtenMemorySize)