mirror of https://github.com/milvus-io/milvus.git
Refine bulkinsert (#20986)
Signed-off-by: yhmo <yihua.mo@zilliz.com> Signed-off-by: yhmo <yihua.mo@zilliz.com>pull/20987/head
parent
e131915207
commit
33a102c6fc
|
@ -978,7 +978,7 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest)
|
|||
defer cancel()
|
||||
// func to report import state to RootCoord.
|
||||
reportFunc := func(res *rootcoordpb.ImportResult) error {
|
||||
status, err := node.rootCoord.ReportImport(newCtx, res)
|
||||
status, err := node.rootCoord.ReportImport(ctx, res)
|
||||
if err != nil {
|
||||
log.Error("fail to report import state to RootCoord", zap.Error(err))
|
||||
return err
|
||||
|
@ -1017,7 +1017,7 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest)
|
|||
msg := "DataNode alloc ts failed"
|
||||
log.Warn(msg)
|
||||
importResult.State = commonpb.ImportState_ImportFailed
|
||||
importResult.Infos = append(importResult.Infos, &commonpb.KeyValuePair{Key: "failed_reason", Value: msg})
|
||||
importResult.Infos = append(importResult.Infos, &commonpb.KeyValuePair{Key: importutil.FailedReason, Value: msg})
|
||||
if reportErr := reportFunc(importResult); reportErr != nil {
|
||||
log.Warn("fail to report import state to RootCoord", zap.Error(reportErr))
|
||||
}
|
||||
|
@ -1040,7 +1040,7 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest)
|
|||
zap.Int64("collection ID", req.GetImportTask().GetCollectionId()),
|
||||
zap.Error(err))
|
||||
importResult.State = commonpb.ImportState_ImportFailed
|
||||
importResult.Infos = append(importResult.Infos, &commonpb.KeyValuePair{Key: "failed_reason", Value: err.Error()})
|
||||
importResult.Infos = append(importResult.Infos, &commonpb.KeyValuePair{Key: importutil.FailedReason, Value: err.Error()})
|
||||
reportErr := reportFunc(importResult)
|
||||
if reportErr != nil {
|
||||
log.Warn("fail to report import state to RootCoord", zap.Error(err))
|
||||
|
@ -1056,7 +1056,7 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest)
|
|||
zap.Int64("task ID", req.GetImportTask().GetTaskId()),
|
||||
zap.Error(inputErr))
|
||||
importResult.State = commonpb.ImportState_ImportFailed
|
||||
importResult.Infos = append(importResult.Infos, &commonpb.KeyValuePair{Key: "failed_reason", Value: inputErr.Error()})
|
||||
importResult.Infos = append(importResult.Infos, &commonpb.KeyValuePair{Key: importutil.FailedReason, Value: inputErr.Error()})
|
||||
reportErr := reportFunc(importResult)
|
||||
if reportErr != nil {
|
||||
log.Warn("fail to report import state to RootCoord", zap.Error(inputErr))
|
||||
|
|
|
@ -41,10 +41,6 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
FailedReason = "failed_reason"
|
||||
Files = "files"
|
||||
CollectionName = "collection"
|
||||
PartitionName = "partition"
|
||||
MaxPendingCount = 32
|
||||
delimiter = "/"
|
||||
)
|
||||
|
@ -290,50 +286,68 @@ func (m *importManager) flipTaskState(ctx context.Context) error {
|
|||
// we need to set the task to failed, because the checkIndexingDone() cannot know
|
||||
// whether the collection has been dropped.
|
||||
|
||||
resp := m.getTaskState(task.GetId())
|
||||
ok, err := m.checkIndexingDone(ctx, resp.GetCollectionId(), resp.GetSegmentIds())
|
||||
if err != nil {
|
||||
log.Error("an error occurred while checking index state of segments",
|
||||
zap.Int64("task ID", task.GetId()),
|
||||
zap.Error(err))
|
||||
// Failed to check indexing state of segments. Skip this task.
|
||||
continue
|
||||
}
|
||||
if ok {
|
||||
if err := m.setImportTaskState(resp.GetId(), commonpb.ImportState_ImportCompleted); err != nil {
|
||||
log.Error("failed to set import task state",
|
||||
zap.Int64("task ID", resp.GetId()),
|
||||
zap.Any("target state", commonpb.ImportState_ImportCompleted),
|
||||
zap.Error(err))
|
||||
// Failed to update task's state. Skip this task.
|
||||
continue
|
||||
}
|
||||
log.Info("indexes are successfully built and the import task has complete!",
|
||||
zap.Int64("task ID", resp.GetId()))
|
||||
log.Info("now start unsetting isImporting state of segments",
|
||||
zap.Int64("task ID", resp.GetId()),
|
||||
zap.Int64s("segment IDs", resp.GetSegmentIds()))
|
||||
// Remove the `isImport` states of these segments only when the import task reaches `ImportState_ImportCompleted` state.
|
||||
status, err := m.callUnsetIsImportingState(ctx, &datapb.UnsetIsImportingStateRequest{
|
||||
SegmentIds: resp.GetSegmentIds(),
|
||||
})
|
||||
if err != nil {
|
||||
log.Error("failed to unset importing state of all segments (could be partial failure)",
|
||||
zap.Error(err))
|
||||
}
|
||||
if status.GetErrorCode() != commonpb.ErrorCode_Success {
|
||||
log.Error("failed to unset importing state of all segments (could be partial failure)",
|
||||
zap.Error(errors.New(status.GetReason())))
|
||||
}
|
||||
}
|
||||
// if this method failed, skip this task, try again in next round
|
||||
m.flipTaskIndexState(ctx, task.GetId())
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *importManager) flipTaskIndexState(ctx context.Context, taskID int64) error {
|
||||
resp := m.getTaskState(taskID)
|
||||
ok, err := m.checkIndexingDone(ctx, resp.GetCollectionId(), resp.GetSegmentIds())
|
||||
if err != nil {
|
||||
log.Error("an error occurred while checking index state of segments",
|
||||
zap.Int64("task ID", taskID),
|
||||
zap.Error(err))
|
||||
// Failed to check indexing state of segments
|
||||
return err
|
||||
}
|
||||
if ok {
|
||||
if err := m.setImportTaskState(resp.GetId(), commonpb.ImportState_ImportCompleted); err != nil {
|
||||
log.Error("failed to set import task state",
|
||||
zap.Int64("task ID", resp.GetId()),
|
||||
zap.Any("target state", commonpb.ImportState_ImportCompleted),
|
||||
zap.Error(err))
|
||||
// Failed to update task's state
|
||||
return err
|
||||
}
|
||||
log.Info("indexes are successfully built and the import task has complete!",
|
||||
zap.Int64("task ID", resp.GetId()))
|
||||
log.Info("now start unsetting isImporting state of segments",
|
||||
zap.Int64("task ID", resp.GetId()),
|
||||
zap.Int64s("segment IDs", resp.GetSegmentIds()))
|
||||
// Remove the `isImport` states of these segments only when the import task reaches `ImportState_ImportCompleted` state.
|
||||
if m.callUnsetIsImportingState == nil {
|
||||
log.Error("callUnsetIsImportingState function of importManager is nil")
|
||||
return fmt.Errorf("failed to describe index: segment state method of import manager is nil")
|
||||
}
|
||||
status, err := m.callUnsetIsImportingState(ctx, &datapb.UnsetIsImportingStateRequest{
|
||||
SegmentIds: resp.GetSegmentIds(),
|
||||
})
|
||||
if err != nil {
|
||||
log.Error("failed to unset importing state of all segments (could be partial failure)",
|
||||
zap.Error(err))
|
||||
return err
|
||||
}
|
||||
if status.GetErrorCode() != commonpb.ErrorCode_Success {
|
||||
log.Error("failed to unset importing state of all segments (could be partial failure)",
|
||||
zap.Error(errors.New(status.GetReason())))
|
||||
return errors.New(status.GetReason())
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// checkIndexingDone checks if indexes are successfully built on segments in `allSegmentIDs`.
|
||||
// It returns error on errors. It returns true if indexes are successfully built on all segments and returns false otherwise.
|
||||
func (m *importManager) checkIndexingDone(ctx context.Context, collID UniqueID, allSegmentIDs []UniqueID) (bool, error) {
|
||||
if m.callDescribeIndex == nil {
|
||||
log.Error("callDescribeIndex function of importManager is nil")
|
||||
return false, fmt.Errorf("failed to describe index: describe index method of import manager is nil")
|
||||
}
|
||||
|
||||
// Check if collection has indexed fields.
|
||||
var descIdxResp *indexpb.DescribeIndexResponse
|
||||
var err error
|
||||
|
@ -561,47 +575,70 @@ func (m *importManager) updateTaskInfo(ir *rootcoordpb.ImportResult) (*datapb.Im
|
|||
}
|
||||
log.Debug("import manager update task import result", zap.Int64("taskID", ir.GetTaskId()))
|
||||
|
||||
found := false
|
||||
var v *datapb.ImportTaskInfo
|
||||
m.workingLock.Lock()
|
||||
defer m.workingLock.Unlock()
|
||||
ok := false
|
||||
var toPersistImportTaskInfo *datapb.ImportTaskInfo
|
||||
if v, ok = m.workingTasks[ir.GetTaskId()]; ok {
|
||||
// If the task has already been marked failed. Prevent further state updating and return an error.
|
||||
if v.GetState().GetStateCode() == commonpb.ImportState_ImportFailed ||
|
||||
v.GetState().GetStateCode() == commonpb.ImportState_ImportFailedAndCleaned {
|
||||
log.Warn("trying to update an already failed task which will end up being a no-op")
|
||||
return nil, errors.New("trying to update an already failed task " + strconv.FormatInt(ir.GetTaskId(), 10))
|
||||
}
|
||||
found = true
|
||||
// Meta persist should be done before memory objs change.
|
||||
toPersistImportTaskInfo = cloneImportTaskInfo(v)
|
||||
toPersistImportTaskInfo.State.StateCode = ir.GetState()
|
||||
toPersistImportTaskInfo.State.Segments = ir.GetSegments()
|
||||
toPersistImportTaskInfo.State.RowCount = ir.GetRowCount()
|
||||
toPersistImportTaskInfo.State.RowIds = ir.GetAutoIds()
|
||||
for _, kv := range ir.GetInfos() {
|
||||
if kv.GetKey() == FailedReason {
|
||||
toPersistImportTaskInfo.State.ErrorMessage = kv.GetValue()
|
||||
break
|
||||
updatedInfo, err := func() (*datapb.ImportTaskInfo, error) {
|
||||
found := false
|
||||
var v *datapb.ImportTaskInfo
|
||||
m.workingLock.Lock()
|
||||
defer m.workingLock.Unlock()
|
||||
ok := false
|
||||
var toPersistImportTaskInfo *datapb.ImportTaskInfo
|
||||
|
||||
if v, ok = m.workingTasks[ir.GetTaskId()]; ok {
|
||||
// If the task has already been marked failed. Prevent further state updating and return an error.
|
||||
if v.GetState().GetStateCode() == commonpb.ImportState_ImportFailed ||
|
||||
v.GetState().GetStateCode() == commonpb.ImportState_ImportFailedAndCleaned {
|
||||
log.Warn("trying to update an already failed task which will end up being a no-op")
|
||||
return nil, errors.New("trying to update an already failed task " + strconv.FormatInt(ir.GetTaskId(), 10))
|
||||
}
|
||||
found = true
|
||||
|
||||
// Meta persist should be done before memory objs change.
|
||||
toPersistImportTaskInfo = cloneImportTaskInfo(v)
|
||||
toPersistImportTaskInfo.State.StateCode = ir.GetState()
|
||||
toPersistImportTaskInfo.State.Segments = ir.GetSegments()
|
||||
toPersistImportTaskInfo.State.RowCount = ir.GetRowCount()
|
||||
toPersistImportTaskInfo.State.RowIds = ir.GetAutoIds()
|
||||
for _, kv := range ir.GetInfos() {
|
||||
if kv.GetKey() == importutil.FailedReason {
|
||||
toPersistImportTaskInfo.State.ErrorMessage = kv.GetValue()
|
||||
break
|
||||
} else if kv.GetKey() == importutil.PersistTimeCost {
|
||||
toPersistImportTaskInfo.Infos = append(toPersistImportTaskInfo.Infos, kv)
|
||||
}
|
||||
}
|
||||
// Update task in task store.
|
||||
if err := m.persistTaskInfo(toPersistImportTaskInfo); err != nil {
|
||||
log.Error("failed to update import task",
|
||||
zap.Int64("task ID", v.GetId()),
|
||||
zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
m.workingTasks[ir.GetTaskId()] = toPersistImportTaskInfo
|
||||
}
|
||||
// Update task in task store.
|
||||
if err := m.persistTaskInfo(toPersistImportTaskInfo); err != nil {
|
||||
log.Error("failed to update import task",
|
||||
zap.Int64("task ID", v.GetId()),
|
||||
zap.Error(err))
|
||||
return nil, err
|
||||
|
||||
if !found {
|
||||
log.Debug("import manager update task import result failed", zap.Int64("task ID", ir.GetTaskId()))
|
||||
return nil, errors.New("failed to update import task, ID not found: " + strconv.FormatInt(ir.TaskId, 10))
|
||||
}
|
||||
m.workingTasks[ir.GetTaskId()] = toPersistImportTaskInfo
|
||||
|
||||
return toPersistImportTaskInfo, nil
|
||||
}()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !found {
|
||||
log.Debug("import manager update task import result failed", zap.Int64("task ID", ir.GetTaskId()))
|
||||
return nil, errors.New("failed to update import task, ID not found: " + strconv.FormatInt(ir.TaskId, 10))
|
||||
// if is ImportState_ImportPersisted, and index is FLAT, set the task to be complated immediately
|
||||
// this method is called from importWrapper.reportPersisted() to rootCoord.ReportImport(),
|
||||
// if flipTaskIndexState failed, the outer caller(importWrapper) will retry 3 times
|
||||
if ir.GetState() == commonpb.ImportState_ImportPersisted {
|
||||
err = m.flipTaskIndexState(m.ctx, updatedInfo.GetId())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return toPersistImportTaskInfo, nil
|
||||
|
||||
return updatedInfo, nil
|
||||
}
|
||||
|
||||
// setImportTaskState sets the task state of an import task. Changes to the import task state will be persisted.
|
||||
|
@ -708,13 +745,14 @@ func (m *importManager) copyTaskInfo(input *datapb.ImportTaskInfo, output *milvu
|
|||
output.IdList = input.GetState().GetRowIds()
|
||||
output.SegmentIds = input.GetState().GetSegments()
|
||||
output.CreateTs = input.GetCreateTs()
|
||||
output.Infos = append(output.Infos, &commonpb.KeyValuePair{Key: Files, Value: strings.Join(input.GetFiles(), ",")})
|
||||
output.Infos = append(output.Infos, &commonpb.KeyValuePair{Key: CollectionName, Value: input.GetCollectionName()})
|
||||
output.Infos = append(output.Infos, &commonpb.KeyValuePair{Key: PartitionName, Value: input.GetPartitionName()})
|
||||
output.Infos = append(output.Infos, &commonpb.KeyValuePair{Key: importutil.Files, Value: strings.Join(input.GetFiles(), ",")})
|
||||
output.Infos = append(output.Infos, &commonpb.KeyValuePair{Key: importutil.CollectionName, Value: input.GetCollectionName()})
|
||||
output.Infos = append(output.Infos, &commonpb.KeyValuePair{Key: importutil.PartitionName, Value: input.GetPartitionName()})
|
||||
output.Infos = append(output.Infos, &commonpb.KeyValuePair{
|
||||
Key: FailedReason,
|
||||
Key: importutil.FailedReason,
|
||||
Value: input.GetState().GetErrorMessage(),
|
||||
})
|
||||
output.Infos = append(output.Infos, input.Infos...)
|
||||
}
|
||||
|
||||
// getTaskState looks for task with the given ID and returns its import state.
|
||||
|
|
|
@ -796,8 +796,9 @@ func TestImportManager_TaskState(t *testing.T) {
|
|||
info := &rootcoordpb.ImportResult{
|
||||
TaskId: 10000,
|
||||
}
|
||||
// the task id doesn't exist
|
||||
_, err := mgr.updateTaskInfo(info)
|
||||
assert.NotNil(t, err)
|
||||
assert.Error(t, err)
|
||||
|
||||
info = &rootcoordpb.ImportResult{
|
||||
TaskId: 2,
|
||||
|
@ -809,18 +810,71 @@ func TestImportManager_TaskState(t *testing.T) {
|
|||
Value: "value1",
|
||||
},
|
||||
{
|
||||
Key: "failed_reason",
|
||||
Key: importutil.FailedReason,
|
||||
Value: "some_reason",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// callDescribeIndex method is nil
|
||||
_, err = mgr.updateTaskInfo(info)
|
||||
assert.Error(t, err)
|
||||
|
||||
mgr.callDescribeIndex = func(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error) {
|
||||
return &indexpb.DescribeIndexResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// describe index failed, return error
|
||||
_, err = mgr.updateTaskInfo(info)
|
||||
assert.Error(t, err)
|
||||
|
||||
mgr.callDescribeIndex = func(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error) {
|
||||
return &indexpb.DescribeIndexResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_IndexNotExist,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
// index doesn't exist, but callUnsetIsImportingState is nil, return error
|
||||
_, err = mgr.updateTaskInfo(info)
|
||||
assert.Error(t, err)
|
||||
|
||||
mgr.callUnsetIsImportingState = func(context.Context, *datapb.UnsetIsImportingStateRequest) (*commonpb.Status, error) {
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
}, nil
|
||||
}
|
||||
// index doesn't exist, but failed to unset importing state, return error
|
||||
_, err = mgr.updateTaskInfo(info)
|
||||
assert.Error(t, err)
|
||||
|
||||
mgr.callUnsetIsImportingState = func(context.Context, *datapb.UnsetIsImportingStateRequest) (*commonpb.Status, error) {
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
}, errors.New("error to unset importing state")
|
||||
}
|
||||
// index doesn't exist, but failed to unset importing state, return error
|
||||
_, err = mgr.updateTaskInfo(info)
|
||||
assert.Error(t, err)
|
||||
|
||||
mgr.callUnsetIsImportingState = func(context.Context, *datapb.UnsetIsImportingStateRequest) (*commonpb.Status, error) {
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
}, nil
|
||||
}
|
||||
// index doesn't exist, the persist task will be set to completed
|
||||
ti, err := mgr.updateTaskInfo(info)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, int64(2), ti.GetId())
|
||||
assert.Equal(t, int64(100), ti.GetCollectionId())
|
||||
assert.Equal(t, int64(0), ti.GetPartitionId())
|
||||
assert.Equal(t, []string{"f2.json"}, ti.GetFiles())
|
||||
assert.Equal(t, commonpb.ImportState_ImportPersisted, ti.GetState().GetStateCode())
|
||||
assert.Equal(t, commonpb.ImportState_ImportCompleted, ti.GetState().GetStateCode())
|
||||
assert.Equal(t, int64(1000), ti.GetState().GetRowCount())
|
||||
|
||||
resp := mgr.getTaskState(10000)
|
||||
|
@ -828,7 +882,7 @@ func TestImportManager_TaskState(t *testing.T) {
|
|||
|
||||
resp = mgr.getTaskState(2)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
assert.Equal(t, commonpb.ImportState_ImportPersisted, resp.State)
|
||||
assert.Equal(t, commonpb.ImportState_ImportCompleted, resp.State)
|
||||
|
||||
resp = mgr.getTaskState(1)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
|
@ -844,7 +898,7 @@ func TestImportManager_TaskState(t *testing.T) {
|
|||
Value: "value1",
|
||||
},
|
||||
{
|
||||
Key: "failed_reason",
|
||||
Key: importutil.FailedReason,
|
||||
Value: "some_reason",
|
||||
},
|
||||
},
|
||||
|
@ -981,11 +1035,11 @@ func TestImportManager_ListAllTasks(t *testing.T) {
|
|||
compareReq = rowReq2
|
||||
}
|
||||
for _, kv := range task.GetInfos() {
|
||||
if kv.GetKey() == CollectionName {
|
||||
if kv.GetKey() == importutil.CollectionName {
|
||||
assert.Equal(t, compareReq.GetCollectionName(), kv.GetValue())
|
||||
} else if kv.GetKey() == PartitionName {
|
||||
} else if kv.GetKey() == importutil.PartitionName {
|
||||
assert.Equal(t, compareReq.GetPartitionName(), kv.GetValue())
|
||||
} else if kv.GetKey() == Files {
|
||||
} else if kv.GetKey() == importutil.Files {
|
||||
assert.Equal(t, strings.Join(compareReq.GetFiles(), ","), kv.GetValue())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/metastore/model"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/etcdpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/proxypb"
|
||||
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
|
||||
|
@ -1127,6 +1128,20 @@ func TestCore_ReportImport(t *testing.T) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
callDescribeIndex := func(ctx context.Context, colID UniqueID) (*indexpb.DescribeIndexResponse, error) {
|
||||
return &indexpb.DescribeIndexResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_IndexNotExist,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
callUnsetIsImportingState := func(context.Context, *datapb.UnsetIsImportingStateRequest) (*commonpb.Status, error) {
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
}, nil
|
||||
}
|
||||
|
||||
t.Run("not healthy", func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
c := newTestCore(withAbnormalCode())
|
||||
|
@ -1188,7 +1203,8 @@ func TestCore_ReportImport(t *testing.T) {
|
|||
withTtSynchronizer(ticker),
|
||||
withDataCoord(dc))
|
||||
c.broker = newServerBroker(c)
|
||||
c.importManager = newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, nil, nil, nil, nil)
|
||||
c.importManager = newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, nil,
|
||||
callDescribeIndex, nil, callUnsetIsImportingState)
|
||||
c.importManager.loadFromTaskStore(true)
|
||||
c.importManager.sendOutTasks(ctx)
|
||||
|
||||
|
|
|
@ -338,7 +338,7 @@ func printFieldsDataInfo(fieldsData map[storage.FieldID]storage.FieldData, msg s
|
|||
}
|
||||
|
||||
if len(files) > 0 {
|
||||
stats = append(stats, zap.Any("files", files))
|
||||
stats = append(stats, zap.Any(Files, files))
|
||||
}
|
||||
log.Info(msg, stats...)
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ import (
|
|||
"bufio"
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
|
@ -57,6 +58,13 @@ const (
|
|||
// if the shard number is a large number, although single segment size is small, but there are lot of in-memory segments,
|
||||
// the total memory size might cause OOM.
|
||||
MaxTotalSizeInMemory = 2 * 1024 * 1024 * 1024 // 2GB
|
||||
|
||||
// keywords of import task informations
|
||||
FailedReason = "failed_reason"
|
||||
Files = "files"
|
||||
CollectionName = "collection"
|
||||
PartitionName = "partition"
|
||||
PersistTimeCost = "persist_cost"
|
||||
)
|
||||
|
||||
// ReportImportAttempts is the maximum # of attempts to retry when import fails.
|
||||
|
@ -294,6 +302,7 @@ func (p *ImportWrapper) fileValidation(filePaths []string) (bool, error) {
|
|||
// if onlyValidate is true, this process only do validation, no data generated, flushFunc will not be called
|
||||
func (p *ImportWrapper) Import(filePaths []string, options ImportOptions) error {
|
||||
log.Info("import wrapper: begin import", zap.Any("filePaths", filePaths), zap.Any("options", options))
|
||||
|
||||
// data restore function to import milvus native binlog files(for backup/restore tools)
|
||||
// the backup/restore tool provide two paths for a partition, the first path is binlog path, the second is deltalog path
|
||||
if options.IsBackup && p.isBinlogImport(filePaths) {
|
||||
|
@ -306,6 +315,7 @@ func (p *ImportWrapper) Import(filePaths []string, options ImportOptions) error
|
|||
return err
|
||||
}
|
||||
|
||||
tr := timerecord.NewTimeRecorder("Import task")
|
||||
if rowBased {
|
||||
// parse and consume row-based files
|
||||
// for row-based files, the JSONRowConsumer will generate autoid for primary key, and split rows into segments
|
||||
|
@ -346,9 +356,6 @@ func (p *ImportWrapper) Import(filePaths []string, options ImportOptions) error
|
|||
}
|
||||
|
||||
printFieldsDataInfo(fields, "import wrapper: combine field data", nil)
|
||||
tr := timerecord.NewTimeRecorder("combine field data")
|
||||
defer tr.Elapse("finished")
|
||||
|
||||
for k, v := range fields {
|
||||
// ignore 0 row field
|
||||
if v.RowNum() == 0 {
|
||||
|
@ -411,17 +418,23 @@ func (p *ImportWrapper) Import(filePaths []string, options ImportOptions) error
|
|||
triggerGC()
|
||||
}
|
||||
|
||||
return p.reportPersisted(p.reportImportAttempts)
|
||||
return p.reportPersisted(p.reportImportAttempts, tr)
|
||||
}
|
||||
|
||||
// reportPersisted notify the rootcoord to mark the task state to be ImportPersisted
|
||||
func (p *ImportWrapper) reportPersisted(reportAttempts uint) error {
|
||||
func (p *ImportWrapper) reportPersisted(reportAttempts uint, tr *timerecord.TimeRecorder) error {
|
||||
// force close all segments
|
||||
err := p.closeAllWorkingSegments()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if tr != nil {
|
||||
ts := tr.Elapse("persist finished").Seconds()
|
||||
p.importResult.Infos = append(p.importResult.Infos,
|
||||
&commonpb.KeyValuePair{Key: PersistTimeCost, Value: strconv.FormatFloat(ts, 'f', 2, 64)})
|
||||
}
|
||||
|
||||
// report file process state
|
||||
p.importResult.State = commonpb.ImportState_ImportPersisted
|
||||
// persist state task is valuable, retry more times in case fail this task only because of network error
|
||||
|
@ -480,6 +493,8 @@ func (p *ImportWrapper) isBinlogImport(filePaths []string) bool {
|
|||
|
||||
// doBinlogImport is the entry of binlog import operation
|
||||
func (p *ImportWrapper) doBinlogImport(filePaths []string, tsStartPoint uint64, tsEndPoint uint64) error {
|
||||
tr := timerecord.NewTimeRecorder("Import task")
|
||||
|
||||
flushFunc := func(fields map[storage.FieldID]storage.FieldData, shardID int) error {
|
||||
printFieldsDataInfo(fields, "import wrapper: prepare to flush binlog data", filePaths)
|
||||
return p.flushFunc(fields, shardID)
|
||||
|
@ -495,7 +510,7 @@ func (p *ImportWrapper) doBinlogImport(filePaths []string, tsStartPoint uint64,
|
|||
return err
|
||||
}
|
||||
|
||||
return p.reportPersisted(p.reportImportAttempts)
|
||||
return p.reportPersisted(p.reportImportAttempts, tr)
|
||||
}
|
||||
|
||||
// parseRowBasedJSON is the entry of row-based json import operation
|
||||
|
|
|
@ -1119,6 +1119,7 @@ func Test_ImportWrapperSplitFieldsData(t *testing.T) {
|
|||
|
||||
func Test_ImportWrapperReportPersisted(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
tr := timerecord.NewTimeRecorder("test")
|
||||
|
||||
importResult := &rootcoordpb.ImportResult{
|
||||
Status: &commonpb.Status{
|
||||
|
@ -1143,15 +1144,16 @@ func Test_ImportWrapperReportPersisted(t *testing.T) {
|
|||
assert.Nil(t, err)
|
||||
|
||||
// success
|
||||
err = wrapper.reportPersisted(2)
|
||||
err = wrapper.reportPersisted(2, tr)
|
||||
assert.Nil(t, err)
|
||||
assert.NotEmpty(t, wrapper.importResult.GetInfos())
|
||||
|
||||
// error when closing segments
|
||||
wrapper.saveSegmentFunc = func(fieldsInsert []*datapb.FieldBinlog, fieldsStats []*datapb.FieldBinlog, segmentID int64, targetChName string, rowCount int64) error {
|
||||
return errors.New("error")
|
||||
}
|
||||
wrapper.workingSegments[0] = &WorkingSegment{}
|
||||
err = wrapper.reportPersisted(2)
|
||||
err = wrapper.reportPersisted(2, tr)
|
||||
assert.Error(t, err)
|
||||
|
||||
// failed to report
|
||||
|
@ -1161,6 +1163,6 @@ func Test_ImportWrapperReportPersisted(t *testing.T) {
|
|||
wrapper.reportFunc = func(res *rootcoordpb.ImportResult) error {
|
||||
return errors.New("error")
|
||||
}
|
||||
err = wrapper.reportPersisted(2)
|
||||
err = wrapper.reportPersisted(2, tr)
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue