fix: bm25 import segment without bm25 stats meta (#38855)

relate: https://github.com/milvus-io/milvus/issues/38854

Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
pull/39489/head
aoiasd 2025-01-21 11:09:04 +08:00 committed by GitHub
parent 341d6c1eb7
commit 9cb4c4e8ac
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 915 additions and 899 deletions

View File

@ -314,13 +314,13 @@ func (s *importScheduler) processInProgressImport(task ImportTask) {
if resp.GetState() == datapb.ImportTaskStateV2_Completed {
for _, info := range resp.GetImportSegmentsInfo() {
// try to parse path and fill logID
err = binlog.CompressBinLogs(info.GetBinlogs(), info.GetDeltalogs(), info.GetStatslogs())
err = binlog.CompressBinLogs(info.GetBinlogs(), info.GetDeltalogs(), info.GetStatslogs(), info.GetBm25Logs())
if err != nil {
log.Warn("fail to CompressBinLogs for import binlogs",
WrapTaskLog(task, zap.Int64("segmentID", info.GetSegmentID()), zap.Error(err))...)
return
}
op1 := UpdateBinlogsOperator(info.GetSegmentID(), info.GetBinlogs(), info.GetStatslogs(), info.GetDeltalogs())
op1 := UpdateBinlogsOperator(info.GetSegmentID(), info.GetBinlogs(), info.GetStatslogs(), info.GetDeltalogs(), info.GetBm25Logs())
op2 := UpdateStatusOperator(info.GetSegmentID(), commonpb.SegmentState_Flushed)
err = s.meta.UpdateSegmentsInfo(context.TODO(), op1, op2)
if err != nil {

View File

@ -919,7 +919,7 @@ func AddBinlogsOperator(segmentID int64, binlogs, statslogs, deltalogs, bm25logs
}
}
func UpdateBinlogsOperator(segmentID int64, binlogs, statslogs, deltalogs []*datapb.FieldBinlog) UpdateOperator {
func UpdateBinlogsOperator(segmentID int64, binlogs, statslogs, deltalogs, bm25logs []*datapb.FieldBinlog) UpdateOperator {
return func(modPack *updateSegmentPack) bool {
segment := modPack.Get(segmentID)
if segment == nil {
@ -931,6 +931,7 @@ func UpdateBinlogsOperator(segmentID int64, binlogs, statslogs, deltalogs []*dat
segment.Binlogs = binlogs
segment.Statslogs = statslogs
segment.Deltalogs = deltalogs
segment.Bm25Statslogs = bm25logs
modPack.increments[segmentID] = metastore.BinlogsIncrement{
Segment: segment.SegmentInfo,
}

View File

@ -840,7 +840,7 @@ func TestUpdateSegmentsInfo(t *testing.T) {
err = meta.UpdateSegmentsInfo(
context.TODO(),
UpdateBinlogsOperator(1, nil, nil, nil),
UpdateBinlogsOperator(1, nil, nil, nil, nil),
)
assert.NoError(t, err)

View File

@ -143,6 +143,7 @@ func UpdateSegmentInfo(info *datapb.ImportSegmentInfo) UpdateAction {
segmentsInfo[segment].Binlogs = mergeFn(segmentsInfo[segment].Binlogs, info.GetBinlogs())
segmentsInfo[segment].Statslogs = mergeFn(segmentsInfo[segment].Statslogs, info.GetStatslogs())
segmentsInfo[segment].Deltalogs = mergeFn(segmentsInfo[segment].Deltalogs, info.GetDeltalogs())
segmentsInfo[segment].Bm25Logs = mergeFn(segmentsInfo[segment].Bm25Logs, info.GetBm25Logs())
return
}
segmentsInfo[segment] = info

View File

@ -105,7 +105,7 @@ func NewSyncTask(ctx context.Context,
func NewImportSegmentInfo(syncTask syncmgr.Task, metaCaches map[string]metacache.MetaCache) (*datapb.ImportSegmentInfo, error) {
segmentID := syncTask.SegmentID()
insertBinlogs, statsBinlog, deltaLog := syncTask.(*syncmgr.SyncTask).Binlogs()
insertBinlogs, statsBinlog, deltaLog, bm25Log := syncTask.(*syncmgr.SyncTask).Binlogs()
metaCache := metaCaches[syncTask.ChannelName()]
segment, ok := metaCache.GetSegmentByID(segmentID)
if !ok {
@ -120,6 +120,7 @@ func NewImportSegmentInfo(syncTask syncmgr.Task, metaCaches map[string]metacache
ImportedRows: segment.FlushedRows(),
Binlogs: lo.Values(insertBinlogs),
Statslogs: lo.Values(statsBinlog),
Bm25Logs: lo.Values(bm25Log),
Deltalogs: deltaLogs,
}, nil
}

View File

@ -414,8 +414,8 @@ func (t *SyncTask) IsFlush() bool {
return t.isFlush
}
func (t *SyncTask) Binlogs() (map[int64]*datapb.FieldBinlog, map[int64]*datapb.FieldBinlog, *datapb.FieldBinlog) {
return t.insertBinlogs, t.statsBinlogs, t.deltaBinlog
func (t *SyncTask) Binlogs() (map[int64]*datapb.FieldBinlog, map[int64]*datapb.FieldBinlog, *datapb.FieldBinlog, map[int64]*datapb.FieldBinlog) {
return t.insertBinlogs, t.statsBinlogs, t.deltaBinlog, t.bm25Binlogs
}
func (t *SyncTask) MarshalJSON() ([]byte, error) {

View File

@ -131,7 +131,7 @@ func (c *channelLifetime) Run() error {
return
}
if tt, ok := t.(*syncmgr.SyncTask); ok {
insertLogs, _, _ := tt.Binlogs()
insertLogs, _, _, _ := tt.Binlogs()
resource.Resource().SegmentAssignStatsManager().UpdateOnSync(tt.SegmentID(), stats.SyncOperationMetrics{
BinLogCounterIncr: 1,
BinLogFileCounterIncr: uint64(len(insertLogs)),

View File

@ -850,6 +850,7 @@ message ImportSegmentInfo {
repeated FieldBinlog binlogs = 3;
repeated FieldBinlog statslogs = 4;
repeated FieldBinlog deltalogs = 5;
repeated FieldBinlog bm25logs = 6;
}
message QueryImportResponse {

File diff suppressed because it is too large Load Diff