milvus/internal/datacoord/import_task_preimport.go

223 lines
7.3 KiB
Go

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datacoord
import (
"context"
"sync/atomic"
"time"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/datacoord/session"
"github.com/milvus-io/milvus/internal/json"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/metrics"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/proto/internalpb"
"github.com/milvus-io/milvus/pkg/v2/taskcommon"
"github.com/milvus-io/milvus/pkg/v2/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/v2/util/timerecord"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
var _ ImportTask = (*preImportTask)(nil)
type preImportTask struct {
task atomic.Pointer[datapb.PreImportTask]
importMeta ImportMeta
tr *timerecord.TimeRecorder
times *taskcommon.Times
retryTimes int64
}
func (p *preImportTask) GetJobID() int64 {
return p.task.Load().GetJobID()
}
func (p *preImportTask) GetTaskID() int64 {
return p.task.Load().GetTaskID()
}
func (p *preImportTask) GetCollectionID() int64 {
return p.task.Load().GetCollectionID()
}
func (p *preImportTask) GetNodeID() int64 {
return p.task.Load().GetNodeID()
}
func (p *preImportTask) GetState() datapb.ImportTaskStateV2 {
return p.task.Load().GetState()
}
func (p *preImportTask) GetReason() string {
return p.task.Load().GetReason()
}
func (p *preImportTask) GetFileStats() []*datapb.ImportFileStats {
return p.task.Load().GetFileStats()
}
func (p *preImportTask) GetCreatedTime() string {
return p.task.Load().GetCreatedTime()
}
func (p *preImportTask) GetCompleteTime() string {
return p.task.Load().GetCompleteTime()
}
func (p *preImportTask) GetTaskType() taskcommon.Type {
return taskcommon.PreImport
}
func (p *preImportTask) GetTaskState() taskcommon.State {
return taskcommon.FromImportState(p.GetState())
}
func (p *preImportTask) GetTaskSlot() int64 {
return int64(CalculateTaskSlot(p, p.importMeta))
}
func (p *preImportTask) SetTaskTime(timeType taskcommon.TimeType, time time.Time) {
p.times.SetTaskTime(timeType, time)
}
func (p *preImportTask) GetTaskTime(timeType taskcommon.TimeType) time.Time {
return timeType.GetTaskTime(p.times)
}
func (p *preImportTask) GetTaskVersion() int64 {
return p.retryTimes
}
func (p *preImportTask) CreateTaskOnWorker(nodeID int64, cluster session.Cluster) {
log.Info("processing pending preimport task...", WrapTaskLog(p)...)
job := p.importMeta.GetJob(context.TODO(), p.GetJobID())
req := AssemblePreImportRequest(p, job)
err := cluster.CreatePreImport(nodeID, req, p.GetTaskSlot())
if err != nil {
log.Warn("preimport failed", WrapTaskLog(p, zap.Error(err))...)
p.retryTimes++
return
}
err = p.importMeta.UpdateTask(context.TODO(), p.GetTaskID(),
UpdateState(datapb.ImportTaskStateV2_InProgress),
UpdateNodeID(nodeID))
if err != nil {
log.Warn("update import task failed", WrapTaskLog(p, zap.Error(err))...)
return
}
pendingDuration := p.GetTR().RecordSpan()
metrics.ImportTaskLatency.WithLabelValues(metrics.ImportStagePending).Observe(float64(pendingDuration.Milliseconds()))
log.Info("preimport task start to execute", WrapTaskLog(p, zap.Int64("scheduledNodeID", nodeID), zap.Duration("taskTimeCost/pending", pendingDuration))...)
}
func (p *preImportTask) QueryTaskOnWorker(cluster session.Cluster) {
req := &datapb.QueryPreImportRequest{
JobID: p.GetJobID(),
TaskID: p.GetTaskID(),
}
resp, err := cluster.QueryPreImport(p.GetNodeID(), req)
if err != nil || resp.GetState() == datapb.ImportTaskStateV2_Retry {
updateErr := p.importMeta.UpdateTask(context.TODO(), p.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Pending))
if updateErr != nil {
log.Warn("failed to update preimport task state to pending", WrapTaskLog(p, zap.Error(updateErr))...)
}
log.Info("reset preimport task state to pending due to error occurs", WrapTaskLog(p, zap.Error(err), zap.String("reason", resp.GetReason()))...)
return
}
if resp.GetState() == datapb.ImportTaskStateV2_Failed {
err = p.importMeta.UpdateJob(context.TODO(), p.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed),
UpdateJobReason(resp.GetReason()))
if err != nil {
log.Warn("failed to update job state to Failed", zap.Int64("jobID", p.GetJobID()), zap.Error(err))
}
log.Warn("preimport failed", WrapTaskLog(p, zap.String("reason", resp.GetReason()))...)
return
}
actions := []UpdateAction{}
if resp.GetState() == datapb.ImportTaskStateV2_InProgress || resp.GetState() == datapb.ImportTaskStateV2_Completed {
actions = append(actions, UpdateFileStats(resp.GetFileStats()))
}
if resp.GetState() == datapb.ImportTaskStateV2_Completed {
actions = append(actions, UpdateState(datapb.ImportTaskStateV2_Completed))
}
if len(actions) > 0 {
err = p.importMeta.UpdateTask(context.TODO(), p.GetTaskID(), actions...)
if err != nil {
log.Warn("update preimport task failed", WrapTaskLog(p, zap.Error(err))...)
return
}
}
log.Info("query preimport", WrapTaskLog(p, zap.String("respState", resp.GetState().String()),
zap.Any("fileStats", resp.GetFileStats()))...)
if resp.GetState() == datapb.ImportTaskStateV2_Completed {
preimportDuration := p.GetTR().RecordSpan()
metrics.ImportTaskLatency.WithLabelValues(metrics.ImportStagePreImport).Observe(float64(preimportDuration.Milliseconds()))
log.Info("preimport done", WrapTaskLog(p, zap.Duration("taskTimeCost/preimport", preimportDuration))...)
}
}
func (p *preImportTask) DropTaskOnWorker(cluster session.Cluster) {
err := DropImportTask(p, cluster, p.importMeta)
if err != nil {
log.Warn("drop import failed", WrapTaskLog(p, zap.Error(err))...)
return
}
log.Info("drop preimport task done", WrapTaskLog(p, zap.Int64("nodeID", p.GetNodeID()))...)
}
func (p *preImportTask) GetType() TaskType {
return PreImportTaskType
}
func (p *preImportTask) GetTR() *timerecord.TimeRecorder {
return p.tr
}
func (p *preImportTask) Clone() ImportTask {
cloned := &preImportTask{
importMeta: p.importMeta,
tr: p.tr,
times: p.times,
}
cloned.task.Store(typeutil.Clone(p.task.Load()))
return cloned
}
func (p *preImportTask) GetSource() datapb.ImportTaskSourceV2 {
return datapb.ImportTaskSourceV2_Request
}
func (p *preImportTask) MarshalJSON() ([]byte, error) {
importTask := metricsinfo.ImportTask{
JobID: p.GetJobID(),
TaskID: p.GetTaskID(),
CollectionID: p.GetCollectionID(),
NodeID: p.GetNodeID(),
State: p.GetState().String(),
Reason: p.GetReason(),
TaskType: p.GetType().String(),
CreatedTime: p.GetCreatedTime(),
CompleteTime: p.GetCompleteTime(),
}
return json.Marshal(importTask)
}