mirror of https://github.com/milvus-io/milvus.git
Update segment id for import task (#21583)
Signed-off-by: groot <yihua.mo@zilliz.com>pull/21612/head
parent
7154b5377e
commit
16798f4b2f
|
@ -1235,6 +1235,32 @@ func assignSegmentFunc(node *DataNode, req *datapb.ImportTaskRequest) importutil
|
|||
zap.Int64("segmentID", segmentID),
|
||||
zap.Int("shard ID", shardID),
|
||||
zap.String("target channel name", targetChName))
|
||||
|
||||
// call report to notify the rootcoord update the segment id list for this task
|
||||
// ignore the returned error, since even report failed the segments still can be cleaned
|
||||
retry.Do(context.Background(), func() error {
|
||||
importResult := &rootcoordpb.ImportResult{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
},
|
||||
TaskId: req.GetImportTask().TaskId,
|
||||
DatanodeId: Params.DataNodeCfg.GetNodeID(),
|
||||
State: commonpb.ImportState_ImportStarted,
|
||||
Segments: []int64{segmentID},
|
||||
AutoIds: make([]int64, 0),
|
||||
RowCount: 0,
|
||||
}
|
||||
status, err := node.rootCoord.ReportImport(context.Background(), importResult)
|
||||
if err != nil {
|
||||
log.Error("fail to report import state to RootCoord", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
if status != nil && status.ErrorCode != commonpb.ErrorCode_Success {
|
||||
return errors.New(status.GetReason())
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
return segmentID, targetChName, nil
|
||||
}
|
||||
}
|
||||
|
|
|
@ -595,7 +595,12 @@ func (m *importManager) updateTaskInfo(ir *rootcoordpb.ImportResult) (*datapb.Im
|
|||
// Meta persist should be done before memory objs change.
|
||||
toPersistImportTaskInfo = cloneImportTaskInfo(v)
|
||||
toPersistImportTaskInfo.State.StateCode = ir.GetState()
|
||||
toPersistImportTaskInfo.State.Segments = ir.GetSegments()
|
||||
// if is started state, append the new created segment id
|
||||
if v.GetState().GetStateCode() == commonpb.ImportState_ImportStarted {
|
||||
toPersistImportTaskInfo.State.Segments = append(toPersistImportTaskInfo.State.Segments, ir.GetSegments()...)
|
||||
} else {
|
||||
toPersistImportTaskInfo.State.Segments = ir.GetSegments()
|
||||
}
|
||||
toPersistImportTaskInfo.State.RowCount = ir.GetRowCount()
|
||||
toPersistImportTaskInfo.State.RowIds = ir.GetAutoIds()
|
||||
for _, kv := range ir.GetInfos() {
|
||||
|
@ -606,6 +611,8 @@ func (m *importManager) updateTaskInfo(ir *rootcoordpb.ImportResult) (*datapb.Im
|
|||
toPersistImportTaskInfo.Infos = append(toPersistImportTaskInfo.Infos, kv)
|
||||
}
|
||||
}
|
||||
log.Info("importManager update task info", zap.Any("toPersistImportTaskInfo", toPersistImportTaskInfo))
|
||||
|
||||
// Update task in task store.
|
||||
if err := m.persistTaskInfo(toPersistImportTaskInfo); err != nil {
|
||||
log.Error("failed to update import task",
|
||||
|
|
Loading…
Reference in New Issue