fix: Refine sync task field binlog compose logic (#28494)

See also #27675
Since `MetaWriter` need `*datapb.FieldBinlog` struct, sync task now
generate FieldBinlog directly

Also fix merged statslog not generated if last task has no insert

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/28502/head
congqixia 2023-11-17 14:40:26 +08:00 committed by GitHub
parent 7895ac96b5
commit a3cd0bc9c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 64 additions and 48 deletions

View File

@ -37,20 +37,14 @@ func BrokerMetaWriter(broker broker.Broker, opts ...retry.Option) MetaWriter {
func (b *brokerMetaWriter) UpdateSync(pack *SyncTask) error {
var (
fieldInsert = []*datapb.FieldBinlog{}
fieldStats = []*datapb.FieldBinlog{}
deltaInfos = make([]*datapb.FieldBinlog, 0)
checkPoints = []*datapb.CheckPoint{}
checkPoints = []*datapb.CheckPoint{}
deltaFieldBinlogs = []*datapb.FieldBinlog{}
)
for k, v := range pack.insertBinlogs {
fieldInsert = append(fieldInsert, &datapb.FieldBinlog{FieldID: k, Binlogs: []*datapb.Binlog{v}})
}
for k, v := range pack.statsBinlogs {
fieldStats = append(fieldStats, &datapb.FieldBinlog{FieldID: k, Binlogs: []*datapb.Binlog{v}})
}
if pack.deltaBinlog != nil {
deltaInfos = append(deltaInfos, &datapb.FieldBinlog{Binlogs: []*datapb.Binlog{pack.deltaBinlog}})
insertFieldBinlogs := lo.MapToSlice(pack.insertBinlogs, func(_ int64, fieldBinlog *datapb.FieldBinlog) *datapb.FieldBinlog { return fieldBinlog })
statsFieldBinlogs := lo.MapToSlice(pack.statsBinlogs, func(_ int64, fieldBinlog *datapb.FieldBinlog) *datapb.FieldBinlog { return fieldBinlog })
if len(pack.deltaBinlog.Binlogs) > 0 {
deltaFieldBinlogs = append(deltaFieldBinlogs, pack.deltaBinlog)
}
// only current segment checkpoint info,
@ -71,14 +65,15 @@ func (b *brokerMetaWriter) UpdateSync(pack *SyncTask) error {
StartPosition: info.StartPosition(),
}
})
getBinlogNum := func(fBinlog *datapb.FieldBinlog) int { return len(fBinlog.GetBinlogs()) }
log.Info("SaveBinlogPath",
zap.Int64("SegmentID", pack.segmentID),
zap.Int64("CollectionID", pack.collectionID),
zap.Any("startPos", startPos),
zap.Any("checkPoints", checkPoints),
zap.Int("Length of Field2BinlogPaths", len(fieldInsert)),
zap.Int("Length of Field2Stats", len(fieldStats)),
// zap.Int("Length of Field2Deltalogs", len(deltaInfos[0].GetBinlogs())),
zap.Int("binlogNum", lo.SumBy(insertFieldBinlogs, getBinlogNum)),
zap.Int("statslogNum", lo.SumBy(statsFieldBinlogs, getBinlogNum)),
zap.Int("deltalogNum", lo.SumBy(deltaFieldBinlogs, getBinlogNum)),
zap.String("vChannelName", pack.channelName),
)
@ -90,9 +85,9 @@ func (b *brokerMetaWriter) UpdateSync(pack *SyncTask) error {
),
SegmentID: pack.segmentID,
CollectionID: pack.collectionID,
Field2BinlogPaths: fieldInsert,
Field2StatslogPaths: fieldStats,
Deltalogs: deltaInfos,
Field2BinlogPaths: insertFieldBinlogs,
Field2StatslogPaths: statsFieldBinlogs,
Deltalogs: deltaFieldBinlogs,
CheckPoints: checkPoints,

View File

@ -14,8 +14,9 @@ import (
func NewSyncTask() *SyncTask {
return &SyncTask{
isFlush: false,
insertBinlogs: make(map[int64]*datapb.Binlog),
statsBinlogs: make(map[int64]*datapb.Binlog),
insertBinlogs: make(map[int64]*datapb.FieldBinlog),
statsBinlogs: make(map[int64]*datapb.FieldBinlog),
deltaBinlog: &datapb.FieldBinlog{},
segmentData: make(map[string][]byte),
}
}

View File

@ -51,9 +51,9 @@ type SyncTask struct {
metacache metacache.MetaCache
metaWriter MetaWriter
insertBinlogs map[int64]*datapb.Binlog
statsBinlogs map[int64]*datapb.Binlog
deltaBinlog *datapb.Binlog
insertBinlogs map[int64]*datapb.FieldBinlog // map[int64]*datapb.Binlog
statsBinlogs map[int64]*datapb.FieldBinlog // map[int64]*datapb.Binlog
deltaBinlog *datapb.FieldBinlog
segmentData map[string][]byte
@ -180,7 +180,7 @@ func (t *SyncTask) serializeDeleteData() error {
t.segmentData[blobPath] = value
data.LogSize = int64(len(blob.Value))
data.LogPath = blobPath
t.deltaBinlog = data
t.appendDeltalog(data)
return nil
}
@ -219,13 +219,13 @@ func (t *SyncTask) serializeBinlog() error {
// [rootPath]/[insert_log]/key
key := path.Join(t.chunkManager.RootPath(), common.SegmentInsertLogPath, k)
t.segmentData[key] = blob.GetValue()
t.insertBinlogs[fieldID] = &datapb.Binlog{
t.appendBinlog(fieldID, &datapb.Binlog{
EntriesNum: blob.RowNum,
TimestampFrom: t.tsFrom,
TimestampTo: t.tsTo,
LogPath: key,
LogSize: int64(memSize[fieldID]),
}
})
logidx += 1
}
@ -260,12 +260,12 @@ func (t *SyncTask) serializeSinglePkStats(fieldID int64, stats *storage.PrimaryK
return nil
}
func (t *SyncTask) serializeMergedPkStats(fieldID int64, pkType schemapb.DataType, stats *storage.PrimaryKeyStats, rowNum int64) error {
func (t *SyncTask) serializeMergedPkStats(fieldID int64, pkType schemapb.DataType) error {
segments := t.metacache.GetSegmentsBy(metacache.WithSegmentIDs(t.segmentID))
var statsList []*storage.PrimaryKeyStats
var oldRowNum int64
var totalRowNum int64
for _, segment := range segments {
oldRowNum += segment.NumOfRows()
totalRowNum += segment.NumOfRows()
statsList = append(statsList, lo.Map(segment.GetHistory(), func(pks *storage.PkStatistics, _ int) *storage.PrimaryKeyStats {
return &storage.PrimaryKeyStats{
FieldID: fieldID,
@ -276,15 +276,12 @@ func (t *SyncTask) serializeMergedPkStats(fieldID int64, pkType schemapb.DataTyp
}
})...)
}
if stats != nil {
statsList = append(statsList, stats)
}
blob, err := t.getInCodec().SerializePkStatsList(statsList, oldRowNum+rowNum)
blob, err := t.getInCodec().SerializePkStatsList(statsList, totalRowNum)
if err != nil {
return err
}
t.convertBlob2StatsBinlog(blob, fieldID, int64(storage.CompoundStatsType), oldRowNum+rowNum)
t.convertBlob2StatsBinlog(blob, fieldID, int64(storage.CompoundStatsType), totalRowNum)
return nil
}
@ -295,38 +292,60 @@ func (t *SyncTask) convertBlob2StatsBinlog(blob *storage.Blob, fieldID, logID in
value := blob.GetValue()
t.segmentData[key] = value
t.statsBinlogs[fieldID] = &datapb.Binlog{
t.appendStatslog(fieldID, &datapb.Binlog{
EntriesNum: rowNum,
TimestampFrom: t.tsFrom,
TimestampTo: t.tsTo,
LogPath: key,
LogSize: int64(len(value)),
}
})
}
func (t *SyncTask) serializePkStatsLog() error {
if t.insertData == nil {
return nil
}
pkField := lo.FindOrElse(t.schema.GetFields(), nil, func(field *schemapb.FieldSchema) bool { return field.GetIsPrimaryKey() })
if pkField == nil {
return merr.WrapErrServiceInternal("cannot find pk field")
}
fieldID := pkField.GetFieldID()
stats, rowNum := t.convertInsertData2PkStats(fieldID, pkField.GetDataType())
// not flush and not insert data
if !t.isFlush && stats == nil {
return nil
if t.insertData != nil {
stats, rowNum := t.convertInsertData2PkStats(fieldID, pkField.GetDataType())
err := t.serializeSinglePkStats(fieldID, stats, rowNum)
if err != nil {
return err
}
}
if t.isFlush {
return t.serializeMergedPkStats(fieldID, pkField.GetDataType(), stats, rowNum)
return t.serializeMergedPkStats(fieldID, pkField.GetDataType())
}
return nil
}
func (t *SyncTask) appendBinlog(fieldID int64, binlog *datapb.Binlog) {
fieldBinlog, ok := t.insertBinlogs[fieldID]
if !ok {
fieldBinlog = &datapb.FieldBinlog{
FieldID: fieldID,
}
t.insertBinlogs[fieldID] = fieldBinlog
}
return t.serializeSinglePkStats(fieldID, stats, rowNum)
fieldBinlog.Binlogs = append(fieldBinlog.Binlogs, binlog)
}
func (t *SyncTask) appendStatslog(fieldID int64, statlog *datapb.Binlog) {
fieldBinlog, ok := t.statsBinlogs[fieldID]
if !ok {
fieldBinlog = &datapb.FieldBinlog{
FieldID: fieldID,
}
t.statsBinlogs[fieldID] = fieldBinlog
}
fieldBinlog.Binlogs = append(fieldBinlog.Binlogs, statlog)
}
func (t *SyncTask) appendDeltalog(deltalog *datapb.Binlog) {
t.deltaBinlog.Binlogs = append(t.deltaBinlog.Binlogs, deltalog)
}
// writeLogs writes log files (binlog/deltalog/statslog) into storage via chunkManger.

View File

@ -166,6 +166,7 @@ func (s *SyncTaskSuite) TestRunNormal() {
bfs.UpdatePKRange(fd)
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs)
metacache.UpdateNumOfRows(1000)(seg)
seg.GetBloomFilterSet().Roll()
s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()