mirror of https://github.com/milvus-io/milvus.git
enhance: Enhance and modify the return content of ImportV2 (#31192)
1. The Import APIs now provide detailed progress information for each imported file, including details such as file name, file size, progress, and more. 2. The APIs now return the collection name and the completion time. 3. Other modifications include changing jobID to jobId and other similar adjustments. issue: https://github.com/milvus-io/milvus/issues/28521 --------- Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/31247/head
parent
652b866587
commit
b5c67948b7
|
@ -290,7 +290,8 @@ func (c *importChecker) checkImportingJob(job ImportJob) {
|
|||
}
|
||||
}
|
||||
|
||||
err = c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Completed))
|
||||
completeTime := time.Now().Format("2006-01-02T15:04:05Z07:00")
|
||||
err = c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Completed), UpdateJobCompleteTime(completeTime))
|
||||
if err != nil {
|
||||
log.Warn("failed to update job state to Completed", zap.Int64("jobID", job.GetJobID()), zap.Error(err))
|
||||
}
|
||||
|
|
|
@ -55,9 +55,16 @@ func UpdateJobReason(reason string) UpdateJobAction {
|
|||
}
|
||||
}
|
||||
|
||||
func UpdateJobCompleteTime(completeTime string) UpdateJobAction {
|
||||
return func(job ImportJob) {
|
||||
job.(*importJob).ImportJob.CompleteTime = completeTime
|
||||
}
|
||||
}
|
||||
|
||||
type ImportJob interface {
|
||||
GetJobID() int64
|
||||
GetCollectionID() int64
|
||||
GetCollectionName() string
|
||||
GetPartitionIDs() []int64
|
||||
GetVchannels() []string
|
||||
GetSchema() *schemapb.CollectionSchema
|
||||
|
@ -65,6 +72,7 @@ type ImportJob interface {
|
|||
GetCleanupTs() uint64
|
||||
GetState() internalpb.ImportJobState
|
||||
GetReason() string
|
||||
GetCompleteTime() string
|
||||
GetFiles() []*internalpb.ImportFile
|
||||
GetOptions() []*commonpb.KeyValuePair
|
||||
Clone() ImportJob
|
||||
|
|
|
@ -303,7 +303,8 @@ func (s *importScheduler) processInProgressImport(task ImportTask) {
|
|||
return
|
||||
}
|
||||
}
|
||||
err = s.imeta.UpdateTask(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed))
|
||||
completeTime := time.Now().Format("2006-01-02T15:04:05Z07:00")
|
||||
err = s.imeta.UpdateTask(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed), UpdateCompleteTime(completeTime))
|
||||
if err != nil {
|
||||
log.Warn("update import task failed", WrapTaskLog(task, zap.Error(err))...)
|
||||
return
|
||||
|
|
|
@ -87,6 +87,14 @@ func UpdateReason(reason string) UpdateAction {
|
|||
}
|
||||
}
|
||||
|
||||
func UpdateCompleteTime(completeTime string) UpdateAction {
|
||||
return func(t ImportTask) {
|
||||
if task, ok := t.(*importTask); ok {
|
||||
task.ImportTaskV2.CompleteTime = completeTime
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func UpdateNodeID(nodeID int64) UpdateAction {
|
||||
return func(t ImportTask) {
|
||||
switch t.GetType() {
|
||||
|
|
|
@ -321,7 +321,7 @@ func getImportingProgress(jobID int64, imeta ImportMeta, meta *meta) float32 {
|
|||
return importingProgress*0.8 + completedProgress*0.2
|
||||
}
|
||||
|
||||
func GetImportProgress(jobID int64, imeta ImportMeta, meta *meta) (int64, internalpb.ImportJobState, string) {
|
||||
func GetJobProgress(jobID int64, imeta ImportMeta, meta *meta) (int64, internalpb.ImportJobState, string) {
|
||||
job := imeta.GetJob(jobID)
|
||||
switch job.GetState() {
|
||||
case internalpb.ImportJobState_Pending:
|
||||
|
@ -345,6 +345,31 @@ func GetImportProgress(jobID int64, imeta ImportMeta, meta *meta) (int64, intern
|
|||
return 0, internalpb.ImportJobState_None, "unknown import job state"
|
||||
}
|
||||
|
||||
func GetTaskProgresses(jobID int64, imeta ImportMeta, meta *meta) []*internalpb.ImportTaskProgress {
|
||||
progresses := make([]*internalpb.ImportTaskProgress, 0)
|
||||
tasks := imeta.GetTaskBy(WithJob(jobID), WithType(ImportTaskType))
|
||||
for _, task := range tasks {
|
||||
totalRows := lo.SumBy(task.GetFileStats(), func(file *datapb.ImportFileStats) int64 {
|
||||
return file.GetTotalRows()
|
||||
})
|
||||
importedRows := meta.GetSegmentsTotalCurrentRows(task.(*importTask).GetSegmentIDs())
|
||||
progress := int64(100)
|
||||
if totalRows != 0 {
|
||||
progress = int64(float32(importedRows) / float32(totalRows) * 100)
|
||||
}
|
||||
for _, fileStat := range task.GetFileStats() {
|
||||
progresses = append(progresses, &internalpb.ImportTaskProgress{
|
||||
FileName: fileStat.GetImportFile().String(),
|
||||
FileSize: fileStat.GetFileSize(),
|
||||
Reason: task.GetReason(),
|
||||
Progress: progress,
|
||||
CompleteTime: task.(*importTask).GetCompleteTime(),
|
||||
})
|
||||
}
|
||||
}
|
||||
return progresses
|
||||
}
|
||||
|
||||
func DropImportTask(task ImportTask, cluster Cluster, tm ImportMeta) error {
|
||||
if task.GetNodeID() == NullNodeID {
|
||||
return nil
|
||||
|
|
|
@ -448,7 +448,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) {
|
|||
// failed state
|
||||
err = imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(mockErr))
|
||||
assert.NoError(t, err)
|
||||
progress, state, reason := GetImportProgress(job.GetJobID(), imeta, meta)
|
||||
progress, state, reason := GetJobProgress(job.GetJobID(), imeta, meta)
|
||||
assert.Equal(t, int64(0), progress)
|
||||
assert.Equal(t, internalpb.ImportJobState_Failed, state)
|
||||
assert.Equal(t, mockErr, reason)
|
||||
|
@ -456,7 +456,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) {
|
|||
// pending state
|
||||
err = imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Pending))
|
||||
assert.NoError(t, err)
|
||||
progress, state, reason = GetImportProgress(job.GetJobID(), imeta, meta)
|
||||
progress, state, reason = GetJobProgress(job.GetJobID(), imeta, meta)
|
||||
assert.Equal(t, int64(10), progress)
|
||||
assert.Equal(t, internalpb.ImportJobState_Pending, state)
|
||||
assert.Equal(t, "", reason)
|
||||
|
@ -464,7 +464,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) {
|
|||
// preImporting state
|
||||
err = imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_PreImporting))
|
||||
assert.NoError(t, err)
|
||||
progress, state, reason = GetImportProgress(job.GetJobID(), imeta, meta)
|
||||
progress, state, reason = GetJobProgress(job.GetJobID(), imeta, meta)
|
||||
assert.Equal(t, int64(10+40), progress)
|
||||
assert.Equal(t, internalpb.ImportJobState_Importing, state)
|
||||
assert.Equal(t, "", reason)
|
||||
|
@ -472,7 +472,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) {
|
|||
// importing state, segmentImportedRows/totalRows = 0.5
|
||||
err = imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Importing))
|
||||
assert.NoError(t, err)
|
||||
progress, state, reason = GetImportProgress(job.GetJobID(), imeta, meta)
|
||||
progress, state, reason = GetJobProgress(job.GetJobID(), imeta, meta)
|
||||
assert.Equal(t, int64(10+40+40*0.5), progress)
|
||||
assert.Equal(t, internalpb.ImportJobState_Importing, state)
|
||||
assert.Equal(t, "", reason)
|
||||
|
@ -494,7 +494,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
err = meta.UpdateSegmentsInfo(UpdateImportedRows(22, 100))
|
||||
assert.NoError(t, err)
|
||||
progress, state, reason = GetImportProgress(job.GetJobID(), imeta, meta)
|
||||
progress, state, reason = GetJobProgress(job.GetJobID(), imeta, meta)
|
||||
assert.Equal(t, int64(float32(10+40+40+10*2/6)), progress)
|
||||
assert.Equal(t, internalpb.ImportJobState_Importing, state)
|
||||
assert.Equal(t, "", reason)
|
||||
|
@ -508,7 +508,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
err = meta.UpdateSegmentsInfo(UpdateIsImporting(22, false))
|
||||
assert.NoError(t, err)
|
||||
progress, state, reason = GetImportProgress(job.GetJobID(), imeta, meta)
|
||||
progress, state, reason = GetJobProgress(job.GetJobID(), imeta, meta)
|
||||
assert.Equal(t, int64(10+40+40+10), progress)
|
||||
assert.Equal(t, internalpb.ImportJobState_Importing, state)
|
||||
assert.Equal(t, "", reason)
|
||||
|
@ -516,7 +516,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) {
|
|||
// completed state
|
||||
err = imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Completed))
|
||||
assert.NoError(t, err)
|
||||
progress, state, reason = GetImportProgress(job.GetJobID(), imeta, meta)
|
||||
progress, state, reason = GetJobProgress(job.GetJobID(), imeta, meta)
|
||||
assert.Equal(t, int64(100), progress)
|
||||
assert.Equal(t, internalpb.ImportJobState_Completed, state)
|
||||
assert.Equal(t, "", reason)
|
||||
|
|
|
@ -1819,16 +1819,17 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter
|
|||
|
||||
job := &importJob{
|
||||
ImportJob: &datapb.ImportJob{
|
||||
JobID: idStart,
|
||||
CollectionID: in.GetCollectionID(),
|
||||
PartitionIDs: in.GetPartitionIDs(),
|
||||
Vchannels: in.GetChannelNames(),
|
||||
Schema: in.GetSchema(),
|
||||
TimeoutTs: timeoutTs,
|
||||
CleanupTs: math.MaxUint64,
|
||||
State: internalpb.ImportJobState_Pending,
|
||||
Files: files,
|
||||
Options: in.GetOptions(),
|
||||
JobID: idStart,
|
||||
CollectionID: in.GetCollectionID(),
|
||||
CollectionName: in.GetCollectionName(),
|
||||
PartitionIDs: in.GetPartitionIDs(),
|
||||
Vchannels: in.GetChannelNames(),
|
||||
Schema: in.GetSchema(),
|
||||
TimeoutTs: timeoutTs,
|
||||
CleanupTs: math.MaxUint64,
|
||||
State: internalpb.ImportJobState_Pending,
|
||||
Files: files,
|
||||
Options: in.GetOptions(),
|
||||
},
|
||||
}
|
||||
err = s.importMeta.AddJob(job)
|
||||
|
@ -1858,10 +1859,14 @@ func (s *Server) GetImportProgress(ctx context.Context, in *internalpb.GetImport
|
|||
resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprint("parse job id failed, err=%w", err)))
|
||||
return resp, nil
|
||||
}
|
||||
progress, state, reason := GetImportProgress(jobID, s.importMeta, s.meta)
|
||||
job := s.importMeta.GetJob(jobID)
|
||||
progress, state, reason := GetJobProgress(jobID, s.importMeta, s.meta)
|
||||
resp.State = state
|
||||
resp.Reason = reason
|
||||
resp.Progress = progress
|
||||
resp.CollectionName = job.GetCollectionName()
|
||||
resp.CompleteTime = job.GetCompleteTime()
|
||||
resp.TaskProgresses = GetTaskProgresses(jobID, s.importMeta, s.meta)
|
||||
log.Info("GetImportProgress done", zap.Any("resp", resp))
|
||||
return resp, nil
|
||||
}
|
||||
|
@ -1889,11 +1894,12 @@ func (s *Server) ListImports(ctx context.Context, req *internalpb.ListImportsReq
|
|||
}
|
||||
|
||||
for _, job := range jobs {
|
||||
progress, state, reason := GetImportProgress(job.GetJobID(), s.importMeta, s.meta)
|
||||
progress, state, reason := GetJobProgress(job.GetJobID(), s.importMeta, s.meta)
|
||||
resp.JobIDs = append(resp.JobIDs, fmt.Sprintf("%d", job.GetJobID()))
|
||||
resp.States = append(resp.States, state)
|
||||
resp.Reasons = append(resp.Reasons, reason)
|
||||
resp.Progresses = append(resp.Progresses, progress)
|
||||
resp.CollectionNames = append(resp.CollectionNames, job.GetCollectionName())
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
|
|
@ -150,7 +150,7 @@ func (e *executor) PreImport(task Task) {
|
|||
}
|
||||
defer reader.Close()
|
||||
start := time.Now()
|
||||
err = e.readFileStat(reader, task, i)
|
||||
err = e.readFileStat(reader, task, i, file)
|
||||
if err != nil {
|
||||
e.handleErr(task, err, "preimport failed")
|
||||
return err
|
||||
|
@ -180,7 +180,12 @@ func (e *executor) PreImport(task Task) {
|
|||
WrapLogFields(task, zap.Any("fileStats", task.(*PreImportTask).GetFileStats()))...)
|
||||
}
|
||||
|
||||
func (e *executor) readFileStat(reader importutilv2.Reader, task Task, fileIdx int) error {
|
||||
func (e *executor) readFileStat(reader importutilv2.Reader, task Task, fileIdx int, file *internalpb.ImportFile) error {
|
||||
fileSize, err := GetFileSize(file, e.cm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
totalRows := 0
|
||||
totalSize := 0
|
||||
hashedStats := make(map[string]*datapb.PartitionImportStats)
|
||||
|
@ -209,6 +214,7 @@ func (e *executor) readFileStat(reader importutilv2.Reader, task Task, fileIdx i
|
|||
}
|
||||
|
||||
stat := &datapb.ImportFileStats{
|
||||
FileSize: fileSize,
|
||||
TotalRows: int64(totalRows),
|
||||
TotalMemorySize: int64(totalSize),
|
||||
HashedStats: hashedStats,
|
||||
|
|
|
@ -260,6 +260,7 @@ func (s *ExecutorSuite) TestExecutor_Start_Preimport() {
|
|||
|
||||
cm := mocks.NewChunkManager(s.T())
|
||||
ioReader := strings.NewReader(string(bytes))
|
||||
cm.EXPECT().Size(mock.Anything, mock.Anything).Return(1024, nil)
|
||||
cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil)
|
||||
s.executor.cm = cm
|
||||
|
||||
|
@ -313,6 +314,7 @@ func (s *ExecutorSuite) TestExecutor_Start_Preimport_Failed() {
|
|||
io.Seeker
|
||||
}
|
||||
ioReader := strings.NewReader(string(bytes))
|
||||
cm.EXPECT().Size(mock.Anything, mock.Anything).Return(1024, nil)
|
||||
cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil)
|
||||
s.executor.cm = cm
|
||||
|
||||
|
@ -461,6 +463,11 @@ func (s *ExecutorSuite) TestExecutor_ReadFileStat() {
|
|||
importFile := &internalpb.ImportFile{
|
||||
Paths: []string{"dummy.json"},
|
||||
}
|
||||
|
||||
cm := mocks.NewChunkManager(s.T())
|
||||
cm.EXPECT().Size(mock.Anything, mock.Anything).Return(1024, nil)
|
||||
s.executor.cm = cm
|
||||
|
||||
var once sync.Once
|
||||
data := createInsertData(s.T(), s.schema, s.numRows)
|
||||
s.reader = importutilv2.NewMockReader(s.T())
|
||||
|
@ -485,7 +492,7 @@ func (s *ExecutorSuite) TestExecutor_ReadFileStat() {
|
|||
}
|
||||
preimportTask := NewPreImportTask(preimportReq)
|
||||
s.manager.Add(preimportTask)
|
||||
err := s.executor.readFileStat(s.reader, preimportTask, 0)
|
||||
err := s.executor.readFileStat(s.reader, preimportTask, 0, importFile)
|
||||
s.NoError(err)
|
||||
}
|
||||
|
||||
|
|
|
@ -92,7 +92,9 @@ func UpdateReason(reason string) UpdateAction {
|
|||
func UpdateFileStat(idx int, fileStat *datapb.ImportFileStats) UpdateAction {
|
||||
return func(task Task) {
|
||||
if it, ok := task.(*PreImportTask); ok {
|
||||
it.PreImportTask.FileStats[idx].FileSize = fileStat.GetFileSize()
|
||||
it.PreImportTask.FileStats[idx].TotalRows = fileStat.GetTotalRows()
|
||||
it.PreImportTask.FileStats[idx].TotalMemorySize = fileStat.GetTotalMemorySize()
|
||||
it.PreImportTask.FileStats[idx].HashedStats = fileStat.GetHashedStats()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/zap"
|
||||
|
@ -29,6 +30,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/datanode/metacache"
|
||||
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/params"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
|
@ -201,6 +203,23 @@ func GetInsertDataRowCount(data *storage.InsertData, schema *schemapb.Collection
|
|||
return 0
|
||||
}
|
||||
|
||||
func GetFileSize(file *internalpb.ImportFile, cm storage.ChunkManager) (int64, error) {
|
||||
fn := func(path string) (int64, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
|
||||
defer cancel()
|
||||
return cm.Size(ctx, path)
|
||||
}
|
||||
var totalSize int64 = 0
|
||||
for _, path := range file.GetPaths() {
|
||||
size, err := fn(path)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
totalSize += size
|
||||
}
|
||||
return totalSize, nil
|
||||
}
|
||||
|
||||
func LogStats(manager TaskManager) {
|
||||
logFunc := func(tasks []Task, taskType TaskType) {
|
||||
byState := lo.GroupBy(tasks, func(t Task) datapb.ImportTaskStateV2 {
|
||||
|
|
|
@ -130,7 +130,7 @@ func (h *HandlersV2) RegisterRoutesToV2(router gin.IRouter) {
|
|||
router.POST(AliasCategory+DropAction, timeoutMiddleware(wrapperPost(func() any { return &AliasReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.dropAlias)))))
|
||||
router.POST(AliasCategory+AlterAction, timeoutMiddleware(wrapperPost(func() any { return &AliasCollectionReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.alterAlias)))))
|
||||
|
||||
router.POST(ImportJobCategory+ListAction, timeoutMiddleware(wrapperPost(func() any { return &CollectionNameReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.listImportJob)))))
|
||||
router.POST(ImportJobCategory+ListAction, timeoutMiddleware(wrapperPost(func() any { return &OptionalCollectionNameReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.listImportJob)))))
|
||||
router.POST(ImportJobCategory+CreateAction, timeoutMiddleware(wrapperPost(func() any { return &ImportReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.createImportJob)))))
|
||||
router.POST(ImportJobCategory+GetProgressAction, timeoutMiddleware(wrapperPost(func() any { return &JobIDReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.getImportJobProcess)))))
|
||||
}
|
||||
|
@ -1642,28 +1642,34 @@ func (h *HandlersV2) alterAlias(ctx context.Context, c *gin.Context, anyReq any,
|
|||
}
|
||||
|
||||
func (h *HandlersV2) listImportJob(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
|
||||
collectionGetter := anyReq.(requestutil.CollectionNameGetter)
|
||||
var collectionName string
|
||||
if collectionGetter, ok := anyReq.(requestutil.CollectionNameGetter); ok {
|
||||
collectionName = collectionGetter.GetCollectionName()
|
||||
}
|
||||
req := &internalpb.ListImportsRequest{
|
||||
DbName: dbName,
|
||||
CollectionName: collectionGetter.GetCollectionName(),
|
||||
CollectionName: collectionName,
|
||||
}
|
||||
resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, func(reqCtx context.Context, req any) (interface{}, error) {
|
||||
return h.proxy.ListImports(reqCtx, req.(*internalpb.ListImportsRequest))
|
||||
})
|
||||
if err == nil {
|
||||
returnData := make([]map[string]interface{}, 0)
|
||||
returnData := make(map[string]interface{})
|
||||
records := make([]map[string]interface{}, 0)
|
||||
response := resp.(*internalpb.ListImportsResponse)
|
||||
for i, jobID := range response.GetJobIDs() {
|
||||
jobDetail := make(map[string]interface{})
|
||||
jobDetail["jobID"] = jobID
|
||||
jobDetail["jobId"] = jobID
|
||||
jobDetail["collectionName"] = response.GetCollectionNames()[i]
|
||||
jobDetail["state"] = response.GetStates()[i].String()
|
||||
jobDetail["progress"] = response.GetProgresses()[i]
|
||||
reason := response.GetReasons()[i]
|
||||
if reason != "" {
|
||||
jobDetail["reason"] = reason
|
||||
}
|
||||
returnData = append(returnData, jobDetail)
|
||||
records = append(records, jobDetail)
|
||||
}
|
||||
returnData["records"] = records
|
||||
c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: returnData})
|
||||
}
|
||||
return resp, err
|
||||
|
@ -1690,7 +1696,7 @@ func (h *HandlersV2) createImportJob(ctx context.Context, c *gin.Context, anyReq
|
|||
})
|
||||
if err == nil {
|
||||
returnData := make(map[string]interface{})
|
||||
returnData["jobID"] = resp.(*internalpb.ImportResponse).GetJobID()
|
||||
returnData["jobId"] = resp.(*internalpb.ImportResponse).GetJobID()
|
||||
c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: returnData})
|
||||
}
|
||||
return resp, err
|
||||
|
@ -1708,13 +1714,28 @@ func (h *HandlersV2) getImportJobProcess(ctx context.Context, c *gin.Context, an
|
|||
if err == nil {
|
||||
response := resp.(*internalpb.GetImportProgressResponse)
|
||||
returnData := make(map[string]interface{})
|
||||
returnData["jobID"] = jobIDGetter.GetJobID()
|
||||
returnData["jobId"] = jobIDGetter.GetJobID()
|
||||
returnData["collectionName"] = response.GetCollectionName()
|
||||
returnData["state"] = response.GetState().String()
|
||||
returnData["progress"] = response.GetProgress()
|
||||
reason := response.GetReason()
|
||||
if reason != "" {
|
||||
returnData["reason"] = reason
|
||||
}
|
||||
details := make([]map[string]interface{}, 0)
|
||||
for _, taskProgress := range response.GetTaskProgresses() {
|
||||
detail := make(map[string]interface{})
|
||||
detail["fileName"] = taskProgress.GetFileName()
|
||||
detail["fileSize"] = taskProgress.GetFileSize()
|
||||
detail["progress"] = taskProgress.GetProgress()
|
||||
detail["completeTime"] = taskProgress.GetCompleteTime()
|
||||
reason = taskProgress.GetReason()
|
||||
if reason != "" {
|
||||
detail["reason"] = reason
|
||||
}
|
||||
details = append(details, detail)
|
||||
}
|
||||
returnData["details"] = details
|
||||
c.JSON(http.StatusOK, gin.H{HTTPReturnCode: http.StatusOK, HTTPReturnData: returnData})
|
||||
}
|
||||
return resp, err
|
||||
|
|
|
@ -898,8 +898,9 @@ func TestMethodPost(t *testing.T) {
|
|||
internalpb.ImportJobState_Failed,
|
||||
internalpb.ImportJobState_Completed,
|
||||
},
|
||||
Reasons: []string{"", "", "mock reason", ""},
|
||||
Progresses: []int64{0, 30, 0, 100},
|
||||
Reasons: []string{"", "", "mock reason", ""},
|
||||
Progresses: []int64{0, 30, 0, 100},
|
||||
CollectionNames: []string{"AAA", "BBB", "CCC", "DDD"},
|
||||
}, nil).Once()
|
||||
mp.EXPECT().GetImportProgress(mock.Anything, mock.Anything).Return(&internalpb.GetImportProgressResponse{
|
||||
Status: &StatusSuccess,
|
||||
|
@ -985,7 +986,7 @@ func TestMethodPost(t *testing.T) {
|
|||
`"userName": "` + util.UserRoot + `", "password": "Milvus", "newPassword": "milvus", "roleName": "` + util.RoleAdmin + `",` +
|
||||
`"roleName": "` + util.RoleAdmin + `", "objectType": "Global", "objectName": "*", "privilege": "*",` +
|
||||
`"aliasName": "` + DefaultAliasName + `",` +
|
||||
`"jobID": "1234567890",` +
|
||||
`"jobId": "1234567890",` +
|
||||
`"files": [["book.json"]]` +
|
||||
`}`))
|
||||
req := httptest.NewRequest(http.MethodPost, testcase.path, bodyReader)
|
||||
|
|
|
@ -33,6 +33,19 @@ func (req *CollectionNameReq) GetPartitionNames() []string {
|
|||
return req.PartitionNames
|
||||
}
|
||||
|
||||
type OptionalCollectionNameReq struct {
|
||||
DbName string `json:"dbName"`
|
||||
CollectionName string `json:"collectionName"`
|
||||
}
|
||||
|
||||
func (req *OptionalCollectionNameReq) GetDbName() string {
|
||||
return req.DbName
|
||||
}
|
||||
|
||||
func (req *OptionalCollectionNameReq) GetCollectionName() string {
|
||||
return req.CollectionName
|
||||
}
|
||||
|
||||
type RenameCollectionReq struct {
|
||||
DbName string `json:"dbName"`
|
||||
CollectionName string `json:"collectionName" binding:"required"`
|
||||
|
@ -82,7 +95,7 @@ func (req *ImportReq) GetOptions() map[string]string {
|
|||
}
|
||||
|
||||
type JobIDReq struct {
|
||||
JobID string `json:"jobID" binding:"required"`
|
||||
JobID string `json:"jobId" binding:"required"`
|
||||
}
|
||||
|
||||
func (req *JobIDReq) GetJobID() string { return req.JobID }
|
||||
|
|
|
@ -859,15 +859,17 @@ message ImportJob {
|
|||
int64 jobID = 1;
|
||||
int64 dbID = 2;
|
||||
int64 collectionID = 3;
|
||||
repeated int64 partitionIDs = 4;
|
||||
repeated string vchannels = 5;
|
||||
schema.CollectionSchema schema = 6;
|
||||
uint64 timeout_ts = 7;
|
||||
uint64 cleanup_ts = 8;
|
||||
internal.ImportJobState state = 9;
|
||||
string reason = 10;
|
||||
repeated internal.ImportFile files = 11;
|
||||
repeated common.KeyValuePair options = 12;
|
||||
string collection_name = 4;
|
||||
repeated int64 partitionIDs = 5;
|
||||
repeated string vchannels = 6;
|
||||
schema.CollectionSchema schema = 7;
|
||||
uint64 timeout_ts = 8;
|
||||
uint64 cleanup_ts = 9;
|
||||
internal.ImportJobState state = 10;
|
||||
string reason = 11;
|
||||
string complete_time = 12;
|
||||
repeated internal.ImportFile files = 13;
|
||||
repeated common.KeyValuePair options = 14;
|
||||
}
|
||||
|
||||
enum ImportTaskStateV2 {
|
||||
|
@ -896,6 +898,7 @@ message ImportTaskV2 {
|
|||
int64 nodeID = 5;
|
||||
ImportTaskStateV2 state = 6;
|
||||
string reason = 7;
|
||||
string complete_time = 8;
|
||||
repeated ImportFileStats file_stats = 9;
|
||||
}
|
||||
|
||||
|
|
|
@ -280,11 +280,12 @@ message ImportFile {
|
|||
message ImportRequestInternal {
|
||||
int64 dbID = 1;
|
||||
int64 collectionID = 2;
|
||||
repeated int64 partitionIDs = 3;
|
||||
repeated string channel_names = 4;
|
||||
schema.CollectionSchema schema = 5;
|
||||
repeated ImportFile files = 6;
|
||||
repeated common.KeyValuePair options = 7;
|
||||
string collection_name = 3;
|
||||
repeated int64 partitionIDs = 4;
|
||||
repeated string channel_names = 5;
|
||||
schema.CollectionSchema schema = 6;
|
||||
repeated ImportFile files = 7;
|
||||
repeated common.KeyValuePair options = 8;
|
||||
}
|
||||
|
||||
message ImportRequest {
|
||||
|
@ -305,11 +306,22 @@ message GetImportProgressRequest {
|
|||
string jobID = 2;
|
||||
}
|
||||
|
||||
message ImportTaskProgress {
|
||||
string file_name = 1;
|
||||
int64 file_size = 2;
|
||||
string reason = 3;
|
||||
int64 progress = 4;
|
||||
string complete_time = 5;
|
||||
}
|
||||
|
||||
message GetImportProgressResponse {
|
||||
common.Status status = 1;
|
||||
ImportJobState state = 2;
|
||||
string reason = 3;
|
||||
int64 progress = 4;
|
||||
string collection_name = 5;
|
||||
string complete_time = 6;
|
||||
repeated ImportTaskProgress task_progresses = 7;
|
||||
}
|
||||
|
||||
message ListImportsRequestInternal {
|
||||
|
@ -328,4 +340,5 @@ message ListImportsResponse {
|
|||
repeated ImportJobState states = 3;
|
||||
repeated string reasons = 4;
|
||||
repeated int64 progresses = 5;
|
||||
repeated string collection_names = 6;
|
||||
}
|
||||
|
|
|
@ -5643,12 +5643,13 @@ func (node *Proxy) ImportV2(ctx context.Context, req *internalpb.ImportRequest)
|
|||
}
|
||||
}
|
||||
importRequest := &internalpb.ImportRequestInternal{
|
||||
CollectionID: collectionID,
|
||||
PartitionIDs: partitionIDs,
|
||||
ChannelNames: channels,
|
||||
Schema: schema.CollectionSchema,
|
||||
Files: req.GetFiles(),
|
||||
Options: req.GetOptions(),
|
||||
CollectionID: collectionID,
|
||||
CollectionName: req.GetCollectionName(),
|
||||
PartitionIDs: partitionIDs,
|
||||
ChannelNames: channels,
|
||||
Schema: schema.CollectionSchema,
|
||||
Files: req.GetFiles(),
|
||||
Options: req.GetOptions(),
|
||||
}
|
||||
resp, err = node.dataCoord.ImportV2(ctx, importRequest)
|
||||
if err != nil {
|
||||
|
|
Loading…
Reference in New Issue