fix: Fix the issue of missing stats log after clustering compaction (#35266)

issue: #35265

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
pull/35369/head
cai.zhang 2024-08-08 14:24:17 +08:00 committed by GitHub
parent 626b1b2f5e
commit aaab827a16
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 249 additions and 44 deletions

View File

@ -134,12 +134,16 @@ func (sss *SyncSegmentsScheduler) SyncSegments(collectionID, partitionID int64,
Level: seg.GetLevel(),
NumOfRows: seg.GetNumOfRows(),
}
statsLogs := make([]*datapb.Binlog, 0)
for _, statsLog := range seg.GetStatslogs() {
if statsLog.GetFieldID() == pkFieldID {
req.SegmentInfos[seg.ID].PkStatsLog = statsLog
break
statsLogs = append(statsLogs, statsLog.GetBinlogs()...)
}
}
req.SegmentInfos[seg.ID].PkStatsLog = &datapb.FieldBinlog{
FieldID: pkFieldID,
Binlogs: statsLogs,
}
}
if err := sss.sessions.SyncSegments(nodeID, req); err != nil {

View File

@ -42,6 +42,7 @@ import (
"github.com/milvus-io/milvus/internal/flushcommon/io"
"github.com/milvus-io/milvus/internal/proto/clusteringpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
@ -592,12 +593,12 @@ func (t *clusteringCompactionTask) mappingSegment(
remained++
if (remained+1)%100 == 0 {
currentBufferTotalMemorySize := t.getBufferTotalUsedMemorySize()
// trigger flushBinlog
t.clusterBufferLocks.RLock(clusterBuffer.id)
currentBufferWriterFull := clusterBuffer.writer.IsFull()
currentBufferWriterFull := clusterBuffer.writer.FlushAndIsFull()
t.clusterBufferLocks.RUnlock(clusterBuffer.id)
currentBufferTotalMemorySize := t.getBufferTotalUsedMemorySize()
currentSegmentNumRows := clusterBuffer.currentSegmentRowNum.Load()
if currentSegmentNumRows > t.plan.GetMaxSegmentRows() || currentBufferWriterFull {
// reach segment/binlog max size
@ -823,18 +824,29 @@ func (t *clusteringCompactionTask) flushAll(ctx context.Context) error {
return nil
}
func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buffer *ClusterBuffer, writer *SegmentWriter) error {
if binlogs, ok := buffer.flushedBinlogs[writer.GetSegmentID()]; !ok || len(binlogs) == 0 {
func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buffer *ClusterBuffer, segmentID int64) error {
if binlogs, ok := buffer.flushedBinlogs[segmentID]; !ok || len(binlogs) == 0 {
return nil
}
binlogNum := 0
numRows := buffer.flushedRowNum[segmentID]
insertLogs := make([]*datapb.FieldBinlog, 0)
for _, fieldBinlog := range buffer.flushedBinlogs[writer.GetSegmentID()] {
for _, fieldBinlog := range buffer.flushedBinlogs[segmentID] {
insertLogs = append(insertLogs, fieldBinlog)
binlogNum = len(fieldBinlog.GetBinlogs())
}
numRows := buffer.flushedRowNum[writer.GetSegmentID()]
statPaths, err := statSerializeWrite(ctx, t.binlogIO, t.logIDAlloc, writer, numRows.Load())
fieldBinlogPaths := make([][]string, 0)
for idx := 0; idx < binlogNum; idx++ {
var ps []string
for _, fieldID := range []int64{t.primaryKeyField.GetFieldID(), common.RowIDField, common.TimeStampField} {
ps = append(ps, buffer.flushedBinlogs[segmentID][fieldID].GetBinlogs()[idx].GetLogPath())
}
fieldBinlogPaths = append(fieldBinlogPaths, ps)
}
statsLogs, err := t.generatePkStats(ctx, segmentID, numRows.Load(), fieldBinlogPaths)
if err != nil {
return err
}
@ -842,10 +854,10 @@ func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buff
// pack current flushBinlog data into a segment
seg := &datapb.CompactionSegment{
PlanID: t.plan.GetPlanID(),
SegmentID: writer.GetSegmentID(),
SegmentID: segmentID,
NumOfRows: numRows.Load(),
InsertLogs: insertLogs,
Field2StatslogPaths: []*datapb.FieldBinlog{statPaths},
Field2StatslogPaths: []*datapb.FieldBinlog{statsLogs},
Channel: t.plan.GetChannel(),
}
buffer.uploadedSegments = append(buffer.uploadedSegments, seg)
@ -853,35 +865,44 @@ func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buff
FieldStats: []storage.FieldStats{buffer.clusteringKeyFieldStats.Clone()},
NumRows: int(numRows.Load()),
}
buffer.uploadedSegmentStats[writer.GetSegmentID()] = segmentStats
buffer.uploadedSegmentStats[segmentID] = segmentStats
for _, binlog := range seg.InsertLogs {
log.Debug("pack binlog in segment", zap.Int64("partitionID", t.partitionID), zap.Int64("segID", writer.GetSegmentID()), zap.String("binlog", binlog.String()))
log.Debug("pack binlog in segment", zap.Int64("partitionID", t.partitionID),
zap.Int64("segID", segmentID), zap.String("binlog", binlog.String()))
}
for _, statsLog := range seg.Field2StatslogPaths {
log.Debug("pack binlog in segment", zap.Int64("partitionID", t.partitionID),
zap.Int64("segID", segmentID), zap.String("binlog", statsLog.String()))
}
log.Debug("finish pack segment", zap.Int64("partitionID", t.partitionID),
zap.Int64("segID", seg.GetSegmentID()),
zap.Int64("row num", seg.GetNumOfRows()))
// clear segment binlogs cache
delete(buffer.flushedBinlogs, writer.GetSegmentID())
// set old writer nil
writer = nil
delete(buffer.flushedBinlogs, segmentID)
return nil
}
func (t *clusteringCompactionTask) flushBinlog(ctx context.Context, buffer *ClusterBuffer, writer *SegmentWriter, pack bool) error {
_, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("flushBinlog-%d", writer.GetSegmentID()))
segmentID := writer.GetSegmentID()
_, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("flushBinlog-%d", segmentID))
defer span.End()
if writer == nil {
log.Warn("buffer writer is nil, please check", zap.Int("buffer id", buffer.id))
return fmt.Errorf("buffer: %d writer is nil, please check", buffer.id)
}
defer func() {
// set old writer nil
writer = nil
}()
buffer.flushLock.Lock()
defer buffer.flushLock.Unlock()
writtenMemorySize := int64(writer.WrittenMemorySize())
writtenRowNum := writer.GetRowNum()
log := log.With(zap.Int("bufferID", buffer.id),
zap.Int64("segmentID", writer.GetSegmentID()),
zap.Int64("segmentID", segmentID),
zap.Bool("pack", pack),
zap.Int64("writerRowNum", writtenRowNum),
zap.Int64("writtenMemorySize", writtenMemorySize),
@ -892,7 +913,7 @@ func (t *clusteringCompactionTask) flushBinlog(ctx context.Context, buffer *Clus
if writtenRowNum <= 0 {
log.Debug("writerRowNum is zero, skip flush")
if pack {
return t.packBufferToSegment(ctx, buffer, writer)
return t.packBufferToSegment(ctx, buffer, segmentID)
}
return nil
}
@ -909,29 +930,30 @@ func (t *clusteringCompactionTask) flushBinlog(ctx context.Context, buffer *Clus
return err
}
if info, ok := buffer.flushedBinlogs[writer.GetSegmentID()]; !ok || info == nil {
buffer.flushedBinlogs[writer.GetSegmentID()] = make(map[typeutil.UniqueID]*datapb.FieldBinlog)
if info, ok := buffer.flushedBinlogs[segmentID]; !ok || info == nil {
buffer.flushedBinlogs[segmentID] = make(map[typeutil.UniqueID]*datapb.FieldBinlog)
}
for fID, path := range partialBinlogs {
tmpBinlog, ok := buffer.flushedBinlogs[writer.GetSegmentID()][fID]
tmpBinlog, ok := buffer.flushedBinlogs[segmentID][fID]
if !ok {
tmpBinlog = path
} else {
tmpBinlog.Binlogs = append(tmpBinlog.Binlogs, path.GetBinlogs()...)
}
buffer.flushedBinlogs[writer.GetSegmentID()][fID] = tmpBinlog
buffer.flushedBinlogs[segmentID][fID] = tmpBinlog
}
curSegFlushedRowNum := buffer.flushedRowNum[writer.GetSegmentID()]
curSegFlushedRowNum := buffer.flushedRowNum[segmentID]
curSegFlushedRowNum.Add(writtenRowNum)
buffer.flushedRowNum[writer.GetSegmentID()] = curSegFlushedRowNum
buffer.flushedRowNum[segmentID] = curSegFlushedRowNum
// clean buffer with writer
buffer.bufferMemorySize.Sub(writtenMemorySize)
t.flushCount.Inc()
if pack {
if err := t.packBufferToSegment(ctx, buffer, writer); err != nil {
if err := t.packBufferToSegment(ctx, buffer, segmentID); err != nil {
return err
}
}
@ -1220,3 +1242,47 @@ func (t *clusteringCompactionTask) checkBuffersAfterCompaction() error {
}
return nil
}
func (t *clusteringCompactionTask) generatePkStats(ctx context.Context, segmentID int64,
numRows int64, binlogPaths [][]string) (*datapb.FieldBinlog, error) {
stats, err := storage.NewPrimaryKeyStats(t.primaryKeyField.GetFieldID(), int64(t.primaryKeyField.GetDataType()), numRows)
if err != nil {
return nil, err
}
for _, path := range binlogPaths {
bytesArr, err := t.binlogIO.Download(ctx, path)
if err != nil {
log.Warn("download insertlogs wrong", zap.Strings("path", path), zap.Error(err))
return nil, err
}
blobs := make([]*storage.Blob, len(bytesArr))
for i := range bytesArr {
blobs[i] = &storage.Blob{Value: bytesArr[i]}
}
pkIter, err := storage.NewInsertBinlogIterator(blobs, t.primaryKeyField.GetFieldID(), t.primaryKeyField.GetDataType())
if err != nil {
log.Warn("new insert binlogs Itr wrong", zap.Strings("path", path), zap.Error(err))
return nil, err
}
for pkIter.HasNext() {
vIter, _ := pkIter.Next()
v, ok := vIter.(*storage.Value)
if !ok {
log.Warn("transfer interface to Value wrong", zap.Strings("path", path))
return nil, errors.New("unexpected error")
}
stats.Update(v.PK)
}
}
codec := storage.NewInsertCodecWithSchema(&etcdpb.CollectionMeta{ID: t.collectionID, Schema: t.plan.GetSchema()})
sblob, err := codec.SerializePkStats(stats, numRows)
if err != nil {
return nil, err
}
return uploadStatsBlobs(ctx, t.collectionID, t.partitionID, segmentID, t.primaryKeyField.GetFieldID(), numRows, t.binlogIO, t.logIDAlloc, sblob)
}

View File

@ -18,6 +18,7 @@ package compaction
import (
"context"
"fmt"
"testing"
"time"
@ -37,6 +38,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
func TestClusteringCompactionTaskSuite(t *testing.T) {
@ -172,8 +174,7 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormal() {
var segmentID int64 = 1001
segWriter, err := NewSegmentWriter(schema, 1000, segmentID, PartitionID, CollectionID)
s.Require().NoError(err)
for i := 0; i < 1000; i++ {
for i := 0; i < 10240; i++ {
v := storage.Value{
PK: storage.NewInt64PrimaryKey(int64(i)),
Timestamp: int64(tsoutil.ComposeTSByTime(getMilvusBirthday(), 0)),
@ -189,24 +190,153 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormal() {
s.plan.SegmentBinlogs = []*datapb.CompactionSegmentBinlogs{
{
SegmentID: 100,
SegmentID: segmentID,
FieldBinlogs: lo.Values(fBinlogs),
},
}
s.task.plan.Schema = genCollectionSchema()
s.task.plan.ClusteringKeyField = 100
s.task.plan.PreferSegmentRows = 100
s.task.plan.MaxSegmentRows = 200
s.task.plan.PreferSegmentRows = 2048
s.task.plan.MaxSegmentRows = 2048
s.task.plan.PreAllocatedSegments = &datapb.IDRange{
Begin: time.Now().UnixMilli(),
End: time.Now().UnixMilli() + 1000,
}
// 8+8+8+4+7+4*4=51
// 51*1024 = 52224
// writer will automatically flush after 1024 rows.
paramtable.Get().Save(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key, "52223")
defer paramtable.Get().Reset(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key)
compactionResult, err := s.task.Compact()
s.Require().NoError(err)
s.Equal(10, len(s.task.clusterBuffers))
s.Equal(10, len(compactionResult.GetSegments()))
s.Equal(5, len(s.task.clusterBuffers))
s.Equal(5, len(compactionResult.GetSegments()))
totalBinlogNum := 0
totalRowNum := int64(0)
for _, fb := range compactionResult.GetSegments()[0].GetInsertLogs() {
for _, b := range fb.GetBinlogs() {
totalBinlogNum++
if fb.GetFieldID() == 100 {
totalRowNum += b.GetEntriesNum()
}
}
}
statsBinlogNum := 0
statsRowNum := int64(0)
for _, sb := range compactionResult.GetSegments()[0].GetField2StatslogPaths() {
for _, b := range sb.GetBinlogs() {
statsBinlogNum++
statsRowNum += b.GetEntriesNum()
}
}
s.Equal(2, totalBinlogNum/len(schema.GetFields()))
s.Equal(1, statsBinlogNum)
s.Equal(totalRowNum, statsRowNum)
}
func (s *ClusteringCompactionTaskSuite) TestCheckBuffersAfterCompaction() {
s.Run("no leak", func() {
task := &clusteringCompactionTask{clusterBuffers: []*ClusterBuffer{{}}}
s.NoError(task.checkBuffersAfterCompaction())
})
s.Run("leak binlog", func() {
task := &clusteringCompactionTask{
clusterBuffers: []*ClusterBuffer{
{
flushedBinlogs: map[typeutil.UniqueID]map[typeutil.UniqueID]*datapb.FieldBinlog{
1: {
101: {
FieldID: 101,
Binlogs: []*datapb.Binlog{{LogID: 1000}},
},
},
},
},
},
}
s.Error(task.checkBuffersAfterCompaction())
})
}
func (s *ClusteringCompactionTaskSuite) TestGeneratePkStats() {
pkField := &schemapb.FieldSchema{
FieldID: 100,
Name: "pk",
IsPrimaryKey: true,
Description: "",
DataType: schemapb.DataType_Int64,
}
s.Run("num rows zero", func() {
task := &clusteringCompactionTask{
primaryKeyField: pkField,
}
binlogs, err := task.generatePkStats(context.Background(), 1, 0, nil)
s.Error(err)
s.Nil(binlogs)
})
s.Run("download binlogs failed", func() {
s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return(nil, fmt.Errorf("mock error"))
task := &clusteringCompactionTask{
binlogIO: s.mockBinlogIO,
primaryKeyField: pkField,
}
binlogs, err := task.generatePkStats(context.Background(), 1, 100, [][]string{{"abc", "def"}})
s.Error(err)
s.Nil(binlogs)
})
s.Run("NewInsertBinlogIterator failed", func() {
s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return([][]byte{[]byte("mock")}, nil)
task := &clusteringCompactionTask{
binlogIO: s.mockBinlogIO,
primaryKeyField: pkField,
}
binlogs, err := task.generatePkStats(context.Background(), 1, 100, [][]string{{"abc", "def"}})
s.Error(err)
s.Nil(binlogs)
})
s.Run("upload failed", func() {
schema := genCollectionSchema()
segWriter, err := NewSegmentWriter(schema, 1000, SegmentID, PartitionID, CollectionID)
s.Require().NoError(err)
for i := 0; i < 2000; i++ {
v := storage.Value{
PK: storage.NewInt64PrimaryKey(int64(i)),
Timestamp: int64(tsoutil.ComposeTSByTime(getMilvusBirthday(), 0)),
Value: genRow(int64(i)),
}
err = segWriter.Write(&v)
s.Require().NoError(err)
}
segWriter.writer.Flush()
kvs, _, err := serializeWrite(context.TODO(), s.mockAlloc, segWriter)
s.NoError(err)
mockBinlogIO := io.NewMockBinlogIO(s.T())
mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return(lo.Values(kvs), nil)
mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(fmt.Errorf("mock error"))
task := &clusteringCompactionTask{
collectionID: CollectionID,
partitionID: PartitionID,
plan: &datapb.CompactionPlan{
Schema: genCollectionSchema(),
},
binlogIO: mockBinlogIO,
primaryKeyField: pkField,
logIDAlloc: s.mockAlloc,
}
binlogs, err := task.generatePkStats(context.Background(), 1, 100, [][]string{{"abc", "def"}})
s.Error(err)
s.Nil(binlogs)
})
}
func genRow(magic int64) map[int64]interface{} {

View File

@ -166,29 +166,34 @@ func serializeWrite(ctx context.Context, allocator allocator.Interface, writer *
return
}
func statSerializeWrite(ctx context.Context, io io.BinlogIO, allocator allocator.Interface, writer *SegmentWriter, finalRowCount int64) (*datapb.FieldBinlog, error) {
func statSerializeWrite(ctx context.Context, io io.BinlogIO, allocator allocator.Interface, writer *SegmentWriter) (*datapb.FieldBinlog, error) {
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "statslog serializeWrite")
defer span.End()
sblob, err := writer.Finish(finalRowCount)
sblob, err := writer.Finish(writer.GetRowNum())
if err != nil {
return nil, err
}
return uploadStatsBlobs(ctx, writer.GetCollectionID(), writer.GetPartitionID(), writer.GetSegmentID(), writer.GetPkID(), writer.GetRowNum(), io, allocator, sblob)
}
func uploadStatsBlobs(ctx context.Context, collectionID, partitionID, segmentID, pkID, numRows int64,
io io.BinlogIO, allocator allocator.Interface, blob *storage.Blob) (*datapb.FieldBinlog, error) {
logID, err := allocator.AllocOne()
if err != nil {
return nil, err
}
key, _ := binlog.BuildLogPath(storage.StatsBinlog, writer.GetCollectionID(), writer.GetPartitionID(), writer.GetSegmentID(), writer.GetPkID(), logID)
kvs := map[string][]byte{key: sblob.GetValue()}
key, _ := binlog.BuildLogPath(storage.StatsBinlog, collectionID, partitionID, segmentID, pkID, logID)
kvs := map[string][]byte{key: blob.GetValue()}
statFieldLog := &datapb.FieldBinlog{
FieldID: writer.GetPkID(),
FieldID: pkID,
Binlogs: []*datapb.Binlog{
{
LogSize: int64(len(sblob.GetValue())),
MemorySize: int64(len(sblob.GetValue())),
LogSize: int64(len(blob.GetValue())),
MemorySize: int64(len(blob.GetValue())),
LogPath: key,
EntriesNum: finalRowCount,
EntriesNum: numRows,
},
},
}

View File

@ -244,7 +244,7 @@ func (t *mixCompactionTask) merge(
}
serWriteStart := time.Now()
sPath, err := statSerializeWrite(ctx, t.binlogIO, t.allocator, writer, remainingRowCount)
sPath, err := statSerializeWrite(ctx, t.binlogIO, t.allocator, writer)
if err != nil {
log.Warn("compact wrong, failed to serialize write segment stats",
zap.Int64("remaining row count", remainingRowCount), zap.Error(err))