milvus/internal/datacoord/import_inspector.go

153 lines
4.1 KiB
Go

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datacoord
import (
"context"
"sort"
"sync"
"time"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/datacoord/allocator"
"github.com/milvus-io/milvus/internal/datacoord/task"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
)
const (
NullNodeID = -1
)
type ImportInspector interface {
Start()
Close()
}
type importInspector struct {
ctx context.Context
meta *meta
alloc allocator.Allocator
importMeta ImportMeta
scheduler task.GlobalScheduler
closeOnce sync.Once
closeChan chan struct{}
}
func NewImportInspector(ctx context.Context, meta *meta, importMeta ImportMeta, scheduler task.GlobalScheduler) ImportInspector {
return &importInspector{
ctx: ctx,
meta: meta,
importMeta: importMeta,
scheduler: scheduler,
closeChan: make(chan struct{}),
}
}
func (s *importInspector) Start() {
s.reloadFromMeta()
log.Ctx(s.ctx).Info("start import inspector")
ticker := time.NewTicker(Params.DataCoordCfg.ImportScheduleInterval.GetAsDuration(time.Second))
defer ticker.Stop()
for {
select {
case <-s.closeChan:
log.Ctx(s.ctx).Info("import inspector exited")
return
case <-ticker.C:
s.inspect()
}
}
}
func (s *importInspector) Close() {
s.closeOnce.Do(func() {
close(s.closeChan)
})
}
func (s *importInspector) reloadFromMeta() {
jobs := s.importMeta.GetJobBy(s.ctx)
sort.Slice(jobs, func(i, j int) bool {
return jobs[i].GetJobID() < jobs[j].GetJobID()
})
for _, job := range jobs {
tasks := s.importMeta.GetTaskBy(s.ctx, WithJob(job.GetJobID()))
for _, task := range tasks {
if task.GetState() == datapb.ImportTaskStateV2_InProgress {
s.scheduler.Enqueue(task)
}
}
}
}
func (s *importInspector) inspect() {
jobs := s.importMeta.GetJobBy(s.ctx)
sort.Slice(jobs, func(i, j int) bool {
return jobs[i].GetJobID() < jobs[j].GetJobID()
})
for _, job := range jobs {
tasks := s.importMeta.GetTaskBy(s.ctx, WithJob(job.GetJobID()))
for _, task := range tasks {
switch task.GetState() {
case datapb.ImportTaskStateV2_Pending:
switch task.GetType() {
case PreImportTaskType:
s.processPendingPreImport(task)
case ImportTaskType:
s.processPendingImport(task)
}
case datapb.ImportTaskStateV2_Failed:
s.processFailed(task)
}
}
}
}
func (s *importInspector) processPendingPreImport(task ImportTask) {
s.scheduler.Enqueue(task)
}
func (s *importInspector) processPendingImport(task ImportTask) {
s.scheduler.Enqueue(task)
}
func (s *importInspector) processFailed(task ImportTask) {
if task.GetType() == ImportTaskType {
originSegmentIDs := task.(*importTask).GetSegmentIDs()
statsSegmentIDs := task.(*importTask).GetSortedSegmentIDs()
segments := append(originSegmentIDs, statsSegmentIDs...)
for _, segment := range segments {
op := UpdateStatusOperator(segment, commonpb.SegmentState_Dropped)
err := s.meta.UpdateSegmentsInfo(s.ctx, op)
if err != nil {
log.Ctx(s.ctx).Warn("drop import segment failed", WrapTaskLog(task, zap.Int64("segment", segment), zap.Error(err))...)
return
}
}
if len(segments) > 0 {
err := s.importMeta.UpdateTask(s.ctx, task.GetTaskID(), UpdateSegmentIDs(nil), UpdateStatsSegmentIDs(nil))
if err != nil {
log.Ctx(s.ctx).Warn("update import task segments failed", WrapTaskLog(task, zap.Error(err))...)
}
}
}
}