mirror of https://github.com/milvus-io/milvus.git
153 lines
4.1 KiB
Go
153 lines
4.1 KiB
Go
// Licensed to the LF AI & Data foundation under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package datacoord
|
|
|
|
import (
|
|
"context"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
|
"github.com/milvus-io/milvus/internal/datacoord/allocator"
|
|
"github.com/milvus-io/milvus/internal/datacoord/task"
|
|
"github.com/milvus-io/milvus/pkg/v2/log"
|
|
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
|
|
)
|
|
|
|
const (
|
|
NullNodeID = -1
|
|
)
|
|
|
|
type ImportInspector interface {
|
|
Start()
|
|
Close()
|
|
}
|
|
|
|
type importInspector struct {
|
|
ctx context.Context
|
|
meta *meta
|
|
alloc allocator.Allocator
|
|
importMeta ImportMeta
|
|
scheduler task.GlobalScheduler
|
|
|
|
closeOnce sync.Once
|
|
closeChan chan struct{}
|
|
}
|
|
|
|
func NewImportInspector(ctx context.Context, meta *meta, importMeta ImportMeta, scheduler task.GlobalScheduler) ImportInspector {
|
|
return &importInspector{
|
|
ctx: ctx,
|
|
meta: meta,
|
|
importMeta: importMeta,
|
|
scheduler: scheduler,
|
|
closeChan: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
func (s *importInspector) Start() {
|
|
s.reloadFromMeta()
|
|
log.Ctx(s.ctx).Info("start import inspector")
|
|
ticker := time.NewTicker(Params.DataCoordCfg.ImportScheduleInterval.GetAsDuration(time.Second))
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-s.closeChan:
|
|
log.Ctx(s.ctx).Info("import inspector exited")
|
|
return
|
|
case <-ticker.C:
|
|
s.inspect()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *importInspector) Close() {
|
|
s.closeOnce.Do(func() {
|
|
close(s.closeChan)
|
|
})
|
|
}
|
|
|
|
func (s *importInspector) reloadFromMeta() {
|
|
jobs := s.importMeta.GetJobBy(s.ctx)
|
|
sort.Slice(jobs, func(i, j int) bool {
|
|
return jobs[i].GetJobID() < jobs[j].GetJobID()
|
|
})
|
|
for _, job := range jobs {
|
|
tasks := s.importMeta.GetTaskBy(s.ctx, WithJob(job.GetJobID()))
|
|
for _, task := range tasks {
|
|
if task.GetState() == datapb.ImportTaskStateV2_InProgress {
|
|
s.scheduler.Enqueue(task)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *importInspector) inspect() {
|
|
jobs := s.importMeta.GetJobBy(s.ctx)
|
|
sort.Slice(jobs, func(i, j int) bool {
|
|
return jobs[i].GetJobID() < jobs[j].GetJobID()
|
|
})
|
|
for _, job := range jobs {
|
|
tasks := s.importMeta.GetTaskBy(s.ctx, WithJob(job.GetJobID()))
|
|
for _, task := range tasks {
|
|
switch task.GetState() {
|
|
case datapb.ImportTaskStateV2_Pending:
|
|
switch task.GetType() {
|
|
case PreImportTaskType:
|
|
s.processPendingPreImport(task)
|
|
case ImportTaskType:
|
|
s.processPendingImport(task)
|
|
}
|
|
case datapb.ImportTaskStateV2_Failed:
|
|
s.processFailed(task)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *importInspector) processPendingPreImport(task ImportTask) {
|
|
s.scheduler.Enqueue(task)
|
|
}
|
|
|
|
func (s *importInspector) processPendingImport(task ImportTask) {
|
|
s.scheduler.Enqueue(task)
|
|
}
|
|
|
|
func (s *importInspector) processFailed(task ImportTask) {
|
|
if task.GetType() == ImportTaskType {
|
|
originSegmentIDs := task.(*importTask).GetSegmentIDs()
|
|
statsSegmentIDs := task.(*importTask).GetSortedSegmentIDs()
|
|
segments := append(originSegmentIDs, statsSegmentIDs...)
|
|
for _, segment := range segments {
|
|
op := UpdateStatusOperator(segment, commonpb.SegmentState_Dropped)
|
|
err := s.meta.UpdateSegmentsInfo(s.ctx, op)
|
|
if err != nil {
|
|
log.Ctx(s.ctx).Warn("drop import segment failed", WrapTaskLog(task, zap.Int64("segment", segment), zap.Error(err))...)
|
|
return
|
|
}
|
|
}
|
|
if len(segments) > 0 {
|
|
err := s.importMeta.UpdateTask(s.ctx, task.GetTaskID(), UpdateSegmentIDs(nil), UpdateStatsSegmentIDs(nil))
|
|
if err != nil {
|
|
log.Ctx(s.ctx).Warn("update import task segments failed", WrapTaskLog(task, zap.Error(err))...)
|
|
}
|
|
}
|
|
}
|
|
}
|