enhance: Return more fields in import progress response (#31539) (#31561)

Return more fields in import progress response, include importedRows and
totalRows. Additionally, ensure compatibility with the old import
progress response by retaining fields of create timestamp and row count.

issue: https://github.com/milvus-io/milvus/issues/31448
https://github.com/milvus-io/milvus/issues/31237
https://github.com/milvus-io/milvus/issues/28521

pr: https://github.com/milvus-io/milvus/pull/31539

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/31572/head
yihao.dai 2024-03-25 14:27:07 +08:00 committed by GitHub
parent 6e0baa47e2
commit 8858f8d612
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 71 additions and 29 deletions

View File

@ -82,6 +82,7 @@ type ImportJob interface {
GetState() internalpb.ImportJobState
GetReason() string
GetRequestedDiskSize() int64
GetStartTime() string
GetCompleteTime() string
GetFiles() []*internalpb.ImportFile
GetOptions() []*commonpb.KeyValuePair

View File

@ -308,7 +308,7 @@ func getPreImportingProgress(jobID int64, imeta ImportMeta) float32 {
return float32(len(completedTasks)) / float32(len(tasks))
}
func getImportingProgress(jobID int64, imeta ImportMeta, meta *meta) float32 {
func getImportingProgress(jobID int64, imeta ImportMeta, meta *meta) (float32, int64, int64) {
var (
importedRows int64
totalRows int64
@ -349,31 +349,38 @@ func getImportingProgress(jobID int64, imeta ImportMeta, meta *meta) float32 {
if totalSegment != 0 {
completedProgress = float32(unsetIsImportingSegment) / float32(totalSegment)
}
return importingProgress*0.8 + completedProgress*0.2
return importingProgress*0.8 + completedProgress*0.2, importedRows, totalRows
}
func GetJobProgress(jobID int64, imeta ImportMeta, meta *meta) (int64, internalpb.ImportJobState, string) {
func GetJobProgress(jobID int64, imeta ImportMeta, meta *meta) (int64, internalpb.ImportJobState, int64, int64, string) {
job := imeta.GetJob(jobID)
switch job.GetState() {
case internalpb.ImportJobState_Pending:
progress := getPendingProgress(jobID, imeta)
return int64(progress * 10), internalpb.ImportJobState_Pending, ""
return int64(progress * 10), internalpb.ImportJobState_Pending, 0, 0, ""
case internalpb.ImportJobState_PreImporting:
progress := getPreImportingProgress(jobID, imeta)
return 10 + int64(progress*40), internalpb.ImportJobState_Importing, ""
return 10 + int64(progress*40), internalpb.ImportJobState_Importing, 0, 0, ""
case internalpb.ImportJobState_Importing:
progress := getImportingProgress(jobID, imeta, meta)
return 10 + 40 + int64(progress*50), internalpb.ImportJobState_Importing, ""
progress, importedRows, totalRows := getImportingProgress(jobID, imeta, meta)
return 10 + 40 + int64(progress*50), internalpb.ImportJobState_Importing, importedRows, totalRows, ""
case internalpb.ImportJobState_Completed:
return 100, internalpb.ImportJobState_Completed, ""
totalRows := int64(0)
tasks := imeta.GetTaskBy(WithJob(jobID), WithType(ImportTaskType))
for _, task := range tasks {
totalRows += lo.SumBy(task.GetFileStats(), func(file *datapb.ImportFileStats) int64 {
return file.GetTotalRows()
})
}
return 100, internalpb.ImportJobState_Completed, totalRows, totalRows, ""
case internalpb.ImportJobState_Failed:
return 0, internalpb.ImportJobState_Failed, job.GetReason()
return 0, internalpb.ImportJobState_Failed, 0, 0, job.GetReason()
}
return 0, internalpb.ImportJobState_None, "unknown import job state"
return 0, internalpb.ImportJobState_None, 0, 0, "unknown import job state"
}
func GetTaskProgresses(jobID int64, imeta ImportMeta, meta *meta) []*internalpb.ImportTaskProgress {
@ -395,6 +402,9 @@ func GetTaskProgresses(jobID int64, imeta ImportMeta, meta *meta) []*internalpb.
Reason: task.GetReason(),
Progress: progress,
CompleteTime: task.(*importTask).GetCompleteTime(),
State: task.GetState().String(),
ImportedRows: progress * fileStat.GetTotalRows() / 100,
TotalRows: fileStat.GetTotalRows(),
})
}
}

View File

@ -500,7 +500,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) {
// failed state
err = imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(mockErr))
assert.NoError(t, err)
progress, state, reason := GetJobProgress(job.GetJobID(), imeta, meta)
progress, state, _, _, reason := GetJobProgress(job.GetJobID(), imeta, meta)
assert.Equal(t, int64(0), progress)
assert.Equal(t, internalpb.ImportJobState_Failed, state)
assert.Equal(t, mockErr, reason)
@ -508,7 +508,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) {
// pending state
err = imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Pending))
assert.NoError(t, err)
progress, state, reason = GetJobProgress(job.GetJobID(), imeta, meta)
progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta)
assert.Equal(t, int64(10), progress)
assert.Equal(t, internalpb.ImportJobState_Pending, state)
assert.Equal(t, "", reason)
@ -516,7 +516,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) {
// preImporting state
err = imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_PreImporting))
assert.NoError(t, err)
progress, state, reason = GetJobProgress(job.GetJobID(), imeta, meta)
progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta)
assert.Equal(t, int64(10+40), progress)
assert.Equal(t, internalpb.ImportJobState_Importing, state)
assert.Equal(t, "", reason)
@ -524,7 +524,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) {
// importing state, segmentImportedRows/totalRows = 0.5
err = imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Importing))
assert.NoError(t, err)
progress, state, reason = GetJobProgress(job.GetJobID(), imeta, meta)
progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta)
assert.Equal(t, int64(10+40+40*0.5), progress)
assert.Equal(t, internalpb.ImportJobState_Importing, state)
assert.Equal(t, "", reason)
@ -546,7 +546,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) {
assert.NoError(t, err)
err = meta.UpdateSegmentsInfo(UpdateImportedRows(22, 100))
assert.NoError(t, err)
progress, state, reason = GetJobProgress(job.GetJobID(), imeta, meta)
progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta)
assert.Equal(t, int64(float32(10+40+40+10*2/6)), progress)
assert.Equal(t, internalpb.ImportJobState_Importing, state)
assert.Equal(t, "", reason)
@ -560,7 +560,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) {
assert.NoError(t, err)
err = meta.UpdateSegmentsInfo(UpdateIsImporting(22, false))
assert.NoError(t, err)
progress, state, reason = GetJobProgress(job.GetJobID(), imeta, meta)
progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta)
assert.Equal(t, int64(10+40+40+10), progress)
assert.Equal(t, internalpb.ImportJobState_Importing, state)
assert.Equal(t, "", reason)
@ -568,7 +568,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) {
// completed state
err = imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Completed))
assert.NoError(t, err)
progress, state, reason = GetJobProgress(job.GetJobID(), imeta, meta)
progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta)
assert.Equal(t, int64(100), progress)
assert.Equal(t, internalpb.ImportJobState_Completed, state)
assert.Equal(t, "", reason)

View File

@ -1696,6 +1696,7 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter
State: internalpb.ImportJobState_Pending,
Files: files,
Options: in.GetOptions(),
StartTime: time.Now().Format("2006-01-02T15:04:05Z07:00"),
},
}
err = s.importMeta.AddJob(job)
@ -1726,12 +1727,15 @@ func (s *Server) GetImportProgress(ctx context.Context, in *internalpb.GetImport
return resp, nil
}
job := s.importMeta.GetJob(jobID)
progress, state, reason := GetJobProgress(jobID, s.importMeta, s.meta)
progress, state, importedRows, totalRows, reason := GetJobProgress(jobID, s.importMeta, s.meta)
resp.State = state
resp.Reason = reason
resp.Progress = progress
resp.CollectionName = job.GetCollectionName()
resp.StartTime = job.GetStartTime()
resp.CompleteTime = job.GetCompleteTime()
resp.ImportedRows = importedRows
resp.TotalRows = totalRows
resp.TaskProgresses = GetTaskProgresses(jobID, s.importMeta, s.meta)
log.Info("GetImportProgress done", zap.Any("resp", resp))
return resp, nil
@ -1760,7 +1764,7 @@ func (s *Server) ListImports(ctx context.Context, req *internalpb.ListImportsReq
}
for _, job := range jobs {
progress, state, reason := GetJobProgress(job.GetJobID(), s.importMeta, s.meta)
progress, state, _, _, reason := GetJobProgress(job.GetJobID(), s.importMeta, s.meta)
resp.JobIDs = append(resp.JobIDs, fmt.Sprintf("%d", job.GetJobID()))
resp.States = append(resp.States, state)
resp.Reasons = append(resp.Reasons, reason)

View File

@ -1821,25 +1821,34 @@ func (h *HandlersV2) getImportJobProcess(ctx context.Context, c *gin.Context, an
returnData := make(map[string]interface{})
returnData["jobId"] = jobIDGetter.GetJobID()
returnData["collectionName"] = response.GetCollectionName()
returnData["completeTime"] = response.GetCompleteTime()
returnData["state"] = response.GetState().String()
returnData["progress"] = response.GetProgress()
returnData["importedRows"] = response.GetImportedRows()
returnData["totalRows"] = response.GetTotalRows()
reason := response.GetReason()
if reason != "" {
returnData["reason"] = reason
}
details := make([]map[string]interface{}, 0)
totalFileSize := int64(0)
for _, taskProgress := range response.GetTaskProgresses() {
detail := make(map[string]interface{})
detail["fileName"] = taskProgress.GetFileName()
detail["fileSize"] = taskProgress.GetFileSize()
detail["progress"] = taskProgress.GetProgress()
detail["completeTime"] = taskProgress.GetCompleteTime()
detail["state"] = taskProgress.GetState()
detail["importedRows"] = taskProgress.GetImportedRows()
detail["totalRows"] = taskProgress.GetTotalRows()
reason = taskProgress.GetReason()
if reason != "" {
detail["reason"] = reason
}
details = append(details, detail)
totalFileSize += taskProgress.GetFileSize()
}
returnData["fileSize"] = totalFileSize
returnData["details"] = details
c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: returnData})
}

View File

@ -781,6 +781,7 @@ message ImportJob {
string complete_time = 13;
repeated internal.ImportFile files = 14;
repeated common.KeyValuePair options = 15;
string start_time = 16;
}
enum ImportTaskStateV2 {

View File

@ -312,6 +312,9 @@ message ImportTaskProgress {
string reason = 3;
int64 progress = 4;
string complete_time = 5;
string state = 6;
int64 imported_rows = 7;
int64 total_rows = 8;
}
message GetImportProgressResponse {
@ -322,6 +325,9 @@ message GetImportProgressResponse {
string collection_name = 5;
string complete_time = 6;
repeated ImportTaskProgress task_progresses = 7;
int64 imported_rows = 8;
int64 total_rows = 9;
string start_time = 10;
}
message ListImportsRequestInternal {

View File

@ -4267,16 +4267,21 @@ func convertToV1GetImportResponse(rsp *internalpb.GetImportProgressResponse) *mi
Key: progressPercent,
Value: strconv.FormatInt(rsp.GetProgress(), 10),
})
var createTs int64
createTime, err := time.Parse("2006-01-02T15:04:05Z07:00", rsp.GetStartTime())
if err == nil {
createTs = createTime.Unix()
}
return &milvuspb.GetImportStateResponse{
Status: rsp.GetStatus(),
State: convertState(rsp.GetState()),
RowCount: 0,
RowCount: rsp.GetImportedRows(),
IdList: nil,
Infos: infos,
Id: 0,
CollectionId: 0,
SegmentIds: nil,
CreateTs: 0,
CreateTs: createTs,
}
}

View File

@ -30,7 +30,9 @@ import (
const (
StartTs = "start_ts"
StartTs2 = "startTs"
EndTs = "end_ts"
EndTs2 = "endTs"
BackupFlag = "backup"
)
@ -38,21 +40,25 @@ type Options []*commonpb.KeyValuePair
func ParseTimeRange(options Options) (uint64, uint64, error) {
importOptions := funcutil.KeyValuePair2Map(options)
getTimestamp := func(key string, defaultValue uint64) (uint64, error) {
if value, ok := importOptions[key]; ok {
pTs, err := strconv.ParseInt(value, 10, 64)
if err != nil {
return 0, merr.WrapErrImportFailed(fmt.Sprintf("parse %s failed, value=%s, err=%s", key, value, err))
getTimestamp := func(defaultValue uint64, targetKeys ...string) (uint64, error) {
for _, targetKey := range targetKeys {
for key, value := range importOptions {
if strings.EqualFold(key, targetKey) {
pTs, err := strconv.ParseInt(value, 10, 64)
if err != nil {
return 0, merr.WrapErrImportFailed(fmt.Sprintf("parse %s failed, value=%s, err=%s", targetKey, value, err))
}
return tsoutil.ComposeTS(pTs, 0), nil
}
}
return tsoutil.ComposeTS(pTs, 0), nil
}
return defaultValue, nil
}
tsStart, err := getTimestamp(StartTs, 0)
tsStart, err := getTimestamp(0, StartTs, StartTs2)
if err != nil {
return 0, 0, err
}
tsEnd, err := getTimestamp(EndTs, math.MaxUint64)
tsEnd, err := getTimestamp(math.MaxUint64, EndTs, EndTs2)
if err != nil {
return 0, 0, err
}