mirror of https://github.com/milvus-io/milvus.git
fix:[Cherry-pick] bm25 import segment loss stats (#38881)
relate: https://github.com/milvus-io/milvus/issues/38854 pr: https://github.com/milvus-io/milvus/pull/38855 Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>pull/38822/head
parent
e389ae215c
commit
6fa096eb39
|
@ -314,13 +314,13 @@ func (s *importScheduler) processInProgressImport(task ImportTask) {
|
|||
if resp.GetState() == datapb.ImportTaskStateV2_Completed {
|
||||
for _, info := range resp.GetImportSegmentsInfo() {
|
||||
// try to parse path and fill logID
|
||||
err = binlog.CompressBinLogs(info.GetBinlogs(), info.GetDeltalogs(), info.GetStatslogs())
|
||||
err = binlog.CompressBinLogs(info.GetBinlogs(), info.GetDeltalogs(), info.GetStatslogs(), info.GetBm25Logs())
|
||||
if err != nil {
|
||||
log.Warn("fail to CompressBinLogs for import binlogs",
|
||||
WrapTaskLog(task, zap.Int64("segmentID", info.GetSegmentID()), zap.Error(err))...)
|
||||
return
|
||||
}
|
||||
op1 := UpdateBinlogsOperator(info.GetSegmentID(), info.GetBinlogs(), info.GetStatslogs(), info.GetDeltalogs())
|
||||
op1 := UpdateBinlogsOperator(info.GetSegmentID(), info.GetBinlogs(), info.GetStatslogs(), info.GetDeltalogs(), info.GetBm25Logs())
|
||||
op2 := UpdateStatusOperator(info.GetSegmentID(), commonpb.SegmentState_Flushed)
|
||||
err = s.meta.UpdateSegmentsInfo(context.TODO(), op1, op2)
|
||||
if err != nil {
|
||||
|
|
|
@ -921,7 +921,7 @@ func AddBinlogsOperator(segmentID int64, binlogs, statslogs, deltalogs, bm25logs
|
|||
}
|
||||
}
|
||||
|
||||
func UpdateBinlogsOperator(segmentID int64, binlogs, statslogs, deltalogs []*datapb.FieldBinlog) UpdateOperator {
|
||||
func UpdateBinlogsOperator(segmentID int64, binlogs, statslogs, deltalogs, bm25logs []*datapb.FieldBinlog) UpdateOperator {
|
||||
return func(modPack *updateSegmentPack) bool {
|
||||
segment := modPack.Get(segmentID)
|
||||
if segment == nil {
|
||||
|
@ -933,6 +933,7 @@ func UpdateBinlogsOperator(segmentID int64, binlogs, statslogs, deltalogs []*dat
|
|||
segment.Binlogs = binlogs
|
||||
segment.Statslogs = statslogs
|
||||
segment.Deltalogs = deltalogs
|
||||
segment.Bm25Statslogs = bm25logs
|
||||
modPack.increments[segmentID] = metastore.BinlogsIncrement{
|
||||
Segment: segment.SegmentInfo,
|
||||
}
|
||||
|
|
|
@ -840,7 +840,7 @@ func TestUpdateSegmentsInfo(t *testing.T) {
|
|||
|
||||
err = meta.UpdateSegmentsInfo(
|
||||
context.TODO(),
|
||||
UpdateBinlogsOperator(1, nil, nil, nil),
|
||||
UpdateBinlogsOperator(1, nil, nil, nil, nil),
|
||||
)
|
||||
assert.NoError(t, err)
|
||||
|
||||
|
|
|
@ -143,6 +143,7 @@ func UpdateSegmentInfo(info *datapb.ImportSegmentInfo) UpdateAction {
|
|||
segmentsInfo[segment].Binlogs = mergeFn(segmentsInfo[segment].Binlogs, info.GetBinlogs())
|
||||
segmentsInfo[segment].Statslogs = mergeFn(segmentsInfo[segment].Statslogs, info.GetStatslogs())
|
||||
segmentsInfo[segment].Deltalogs = mergeFn(segmentsInfo[segment].Deltalogs, info.GetDeltalogs())
|
||||
segmentsInfo[segment].Bm25Logs = mergeFn(segmentsInfo[segment].Bm25Logs, info.GetBm25Logs())
|
||||
return
|
||||
}
|
||||
segmentsInfo[segment] = info
|
||||
|
|
|
@ -105,7 +105,7 @@ func NewSyncTask(ctx context.Context,
|
|||
|
||||
func NewImportSegmentInfo(syncTask syncmgr.Task, metaCaches map[string]metacache.MetaCache) (*datapb.ImportSegmentInfo, error) {
|
||||
segmentID := syncTask.SegmentID()
|
||||
insertBinlogs, statsBinlog, deltaLog := syncTask.(*syncmgr.SyncTask).Binlogs()
|
||||
insertBinlogs, statsBinlog, deltaLog, bm25Log := syncTask.(*syncmgr.SyncTask).Binlogs()
|
||||
metaCache := metaCaches[syncTask.ChannelName()]
|
||||
segment, ok := metaCache.GetSegmentByID(segmentID)
|
||||
if !ok {
|
||||
|
@ -120,6 +120,7 @@ func NewImportSegmentInfo(syncTask syncmgr.Task, metaCaches map[string]metacache
|
|||
ImportedRows: segment.FlushedRows(),
|
||||
Binlogs: lo.Values(insertBinlogs),
|
||||
Statslogs: lo.Values(statsBinlog),
|
||||
Bm25Logs: lo.Values(bm25Log),
|
||||
Deltalogs: deltaLogs,
|
||||
}, nil
|
||||
}
|
||||
|
|
|
@ -414,8 +414,8 @@ func (t *SyncTask) IsFlush() bool {
|
|||
return t.isFlush
|
||||
}
|
||||
|
||||
func (t *SyncTask) Binlogs() (map[int64]*datapb.FieldBinlog, map[int64]*datapb.FieldBinlog, *datapb.FieldBinlog) {
|
||||
return t.insertBinlogs, t.statsBinlogs, t.deltaBinlog
|
||||
func (t *SyncTask) Binlogs() (map[int64]*datapb.FieldBinlog, map[int64]*datapb.FieldBinlog, *datapb.FieldBinlog, map[int64]*datapb.FieldBinlog) {
|
||||
return t.insertBinlogs, t.statsBinlogs, t.deltaBinlog, t.bm25Binlogs
|
||||
}
|
||||
|
||||
func (t *SyncTask) MarshalJSON() ([]byte, error) {
|
||||
|
|
|
@ -850,6 +850,7 @@ message ImportSegmentInfo {
|
|||
repeated FieldBinlog binlogs = 3;
|
||||
repeated FieldBinlog statslogs = 4;
|
||||
repeated FieldBinlog deltalogs = 5;
|
||||
repeated FieldBinlog bm25logs = 6;
|
||||
}
|
||||
|
||||
message QueryImportResponse {
|
||||
|
|
|
@ -122,7 +122,7 @@ func (c *channelLifetime) Run() error {
|
|||
return
|
||||
}
|
||||
if tt, ok := t.(*syncmgr.SyncTask); ok {
|
||||
insertLogs, _, _ := tt.Binlogs()
|
||||
insertLogs, _, _, _ := tt.Binlogs()
|
||||
resource.Resource().SegmentAssignStatsManager().UpdateOnSync(tt.SegmentID(), stats.SyncOperationMetrics{
|
||||
BinLogCounterIncr: 1,
|
||||
BinLogFileCounterIncr: uint64(len(insertLogs)),
|
||||
|
|
Loading…
Reference in New Issue