mirror of https://github.com/milvus-io/milvus.git
367 lines
12 KiB
Go
367 lines
12 KiB
Go
// Licensed to the LF AI & Data foundation under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package datacoord
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/cockroachdb/errors"
|
|
"go.uber.org/zap"
|
|
"google.golang.org/protobuf/proto"
|
|
|
|
"github.com/milvus-io/milvus/internal/datacoord/allocator"
|
|
"github.com/milvus-io/milvus/internal/datacoord/task"
|
|
"github.com/milvus-io/milvus/pkg/v2/log"
|
|
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
|
|
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/merr"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
|
)
|
|
|
|
type StatsInspector interface {
|
|
Start()
|
|
Stop()
|
|
SubmitStatsTask(originSegmentID, targetSegmentID int64, subJobType indexpb.StatsSubJob, canRecycle bool) error
|
|
GetStatsTask(originSegmentID int64, subJobType indexpb.StatsSubJob) *indexpb.StatsTask
|
|
DropStatsTask(originSegmentID int64, subJobType indexpb.StatsSubJob) error
|
|
}
|
|
|
|
var _ StatsInspector = (*statsInspector)(nil)
|
|
|
|
type statsInspector struct {
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
|
|
loopWg sync.WaitGroup
|
|
|
|
mt *meta
|
|
|
|
scheduler task.GlobalScheduler
|
|
allocator allocator.Allocator
|
|
handler Handler
|
|
compactionInspector CompactionInspector
|
|
ievm IndexEngineVersionManager
|
|
}
|
|
|
|
func newStatsInspector(ctx context.Context,
|
|
mt *meta,
|
|
scheduler task.GlobalScheduler,
|
|
allocator allocator.Allocator,
|
|
handler Handler,
|
|
compactionInspector CompactionInspector,
|
|
ievm IndexEngineVersionManager,
|
|
) *statsInspector {
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
return &statsInspector{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
loopWg: sync.WaitGroup{},
|
|
mt: mt,
|
|
scheduler: scheduler,
|
|
allocator: allocator,
|
|
handler: handler,
|
|
compactionInspector: compactionInspector,
|
|
ievm: ievm,
|
|
}
|
|
}
|
|
|
|
func (si *statsInspector) Start() {
|
|
si.reloadFromMeta()
|
|
si.loopWg.Add(2)
|
|
go si.triggerStatsTaskLoop()
|
|
go si.cleanupStatsTasksLoop()
|
|
}
|
|
|
|
func (si *statsInspector) Stop() {
|
|
si.cancel()
|
|
si.loopWg.Wait()
|
|
}
|
|
|
|
func (si *statsInspector) reloadFromMeta() {
|
|
tasks := si.mt.statsTaskMeta.GetAllTasks()
|
|
for _, st := range tasks {
|
|
if st.GetState() != indexpb.JobState_JobStateInit &&
|
|
st.GetState() != indexpb.JobState_JobStateRetry &&
|
|
st.GetState() != indexpb.JobState_JobStateInProgress {
|
|
continue
|
|
}
|
|
segment := si.mt.GetHealthySegment(si.ctx, st.GetSegmentID())
|
|
taskSlot := int64(0)
|
|
if segment != nil {
|
|
taskSlot = calculateStatsTaskSlot(segment.getSegmentSize())
|
|
}
|
|
si.scheduler.Enqueue(newStatsTask(
|
|
proto.Clone(st).(*indexpb.StatsTask),
|
|
taskSlot,
|
|
si.mt,
|
|
si.handler,
|
|
si.allocator,
|
|
si.ievm,
|
|
))
|
|
}
|
|
}
|
|
|
|
func (si *statsInspector) triggerStatsTaskLoop() {
|
|
log.Info("start checkStatsTaskLoop...")
|
|
defer si.loopWg.Done()
|
|
|
|
ticker := time.NewTicker(Params.DataCoordCfg.TaskCheckInterval.GetAsDuration(time.Second))
|
|
defer ticker.Stop()
|
|
|
|
lastJSONStatsLastTrigger := time.Now().Unix()
|
|
maxJSONStatsTaskCount := 0
|
|
for {
|
|
select {
|
|
case <-si.ctx.Done():
|
|
log.Warn("DataCoord context done, exit checkStatsTaskLoop...")
|
|
return
|
|
case <-ticker.C:
|
|
si.triggerTextStatsTask()
|
|
si.triggerBM25StatsTask()
|
|
lastJSONStatsLastTrigger, maxJSONStatsTaskCount = si.triggerJsonKeyIndexStatsTask(lastJSONStatsLastTrigger, maxJSONStatsTaskCount)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (si *statsInspector) enableBM25() bool {
|
|
return false
|
|
}
|
|
|
|
func needDoTextIndex(segment *SegmentInfo, fieldIDs []UniqueID) bool {
|
|
if !isFlush(segment) || segment.GetLevel() == datapb.SegmentLevel_L0 ||
|
|
!segment.GetIsSorted() {
|
|
return false
|
|
}
|
|
|
|
for _, fieldID := range fieldIDs {
|
|
if segment.GetTextStatsLogs() == nil {
|
|
return true
|
|
}
|
|
if segment.GetTextStatsLogs()[fieldID] == nil {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func needDoJsonKeyIndex(segment *SegmentInfo, fieldIDs []UniqueID) bool {
|
|
if !isFlush(segment) || segment.GetLevel() == datapb.SegmentLevel_L0 ||
|
|
!segment.GetIsSorted() {
|
|
return false
|
|
}
|
|
|
|
for _, fieldID := range fieldIDs {
|
|
if segment.GetJsonKeyStats() == nil {
|
|
return true
|
|
}
|
|
if segment.GetJsonKeyStats()[fieldID] == nil {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func needDoBM25(segment *SegmentInfo, fieldIDs []UniqueID) bool {
|
|
// TODO: docking bm25 stats task
|
|
return false
|
|
}
|
|
|
|
func (si *statsInspector) triggerTextStatsTask() {
|
|
collections := si.mt.GetCollections()
|
|
for _, collection := range collections {
|
|
needTriggerFieldIDs := make([]UniqueID, 0)
|
|
for _, field := range collection.Schema.GetFields() {
|
|
// TODO @longjiquan: please replace it to fieldSchemaHelper.EnableMath
|
|
h := typeutil.CreateFieldSchemaHelper(field)
|
|
if !h.EnableMatch() {
|
|
continue
|
|
}
|
|
needTriggerFieldIDs = append(needTriggerFieldIDs, field.GetFieldID())
|
|
}
|
|
segments := si.mt.SelectSegments(si.ctx, WithCollection(collection.ID), SegmentFilterFunc(func(seg *SegmentInfo) bool {
|
|
return seg.GetIsSorted() && needDoTextIndex(seg, needTriggerFieldIDs)
|
|
}))
|
|
|
|
for _, segment := range segments {
|
|
if err := si.SubmitStatsTask(segment.GetID(), segment.GetID(), indexpb.StatsSubJob_TextIndexJob, true); err != nil {
|
|
log.Warn("create stats task with text index for segment failed, wait for retry",
|
|
zap.Int64("segmentID", segment.GetID()), zap.Error(err))
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (si *statsInspector) triggerJsonKeyIndexStatsTask(lastJSONStatsLastTrigger int64, maxJSONStatsTaskCount int) (int64, int) {
|
|
collections := si.mt.GetCollections()
|
|
for _, collection := range collections {
|
|
needTriggerFieldIDs := make([]UniqueID, 0)
|
|
for _, field := range collection.Schema.GetFields() {
|
|
h := typeutil.CreateFieldSchemaHelper(field)
|
|
if h.EnableJSONKeyStatsIndex() && Params.CommonCfg.EnabledJSONKeyStats.GetAsBool() {
|
|
needTriggerFieldIDs = append(needTriggerFieldIDs, field.GetFieldID())
|
|
}
|
|
}
|
|
segments := si.mt.SelectSegments(si.ctx, WithCollection(collection.ID), SegmentFilterFunc(func(seg *SegmentInfo) bool {
|
|
return needDoJsonKeyIndex(seg, needTriggerFieldIDs)
|
|
}))
|
|
if time.Now().Unix()-lastJSONStatsLastTrigger > int64(Params.DataCoordCfg.JSONStatsTriggerInterval.GetAsDuration(time.Minute).Seconds()) {
|
|
lastJSONStatsLastTrigger = time.Now().Unix()
|
|
maxJSONStatsTaskCount = 0
|
|
}
|
|
for _, segment := range segments {
|
|
if maxJSONStatsTaskCount >= Params.DataCoordCfg.JSONStatsTriggerCount.GetAsInt() {
|
|
break
|
|
}
|
|
if err := si.SubmitStatsTask(segment.GetID(), segment.GetID(), indexpb.StatsSubJob_JsonKeyIndexJob, true); err != nil {
|
|
log.Warn("create stats task with json key index for segment failed, wait for retry:",
|
|
zap.Int64("segmentID", segment.GetID()), zap.Error(err))
|
|
continue
|
|
}
|
|
maxJSONStatsTaskCount++
|
|
}
|
|
}
|
|
return lastJSONStatsLastTrigger, maxJSONStatsTaskCount
|
|
}
|
|
|
|
func (si *statsInspector) triggerBM25StatsTask() {
|
|
collections := si.mt.GetCollections()
|
|
for _, collection := range collections {
|
|
needTriggerFieldIDs := make([]UniqueID, 0)
|
|
for _, field := range collection.Schema.GetFields() {
|
|
// TODO: docking bm25 stats task
|
|
if si.enableBM25() {
|
|
needTriggerFieldIDs = append(needTriggerFieldIDs, field.GetFieldID())
|
|
}
|
|
}
|
|
segments := si.mt.SelectSegments(si.ctx, WithCollection(collection.ID), SegmentFilterFunc(func(seg *SegmentInfo) bool {
|
|
return seg.GetIsSorted() && needDoBM25(seg, needTriggerFieldIDs)
|
|
}))
|
|
|
|
for _, segment := range segments {
|
|
if err := si.SubmitStatsTask(segment.GetID(), segment.GetID(), indexpb.StatsSubJob_BM25Job, true); err != nil {
|
|
log.Warn("create stats task with bm25 for segment failed, wait for retry",
|
|
zap.Int64("segmentID", segment.GetID()), zap.Error(err))
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// cleanupStatsTasks clean up the finished/failed stats tasks
|
|
func (si *statsInspector) cleanupStatsTasksLoop() {
|
|
log.Info("start cleanupStatsTasksLoop...")
|
|
defer si.loopWg.Done()
|
|
|
|
ticker := time.NewTicker(Params.DataCoordCfg.GCInterval.GetAsDuration(time.Second))
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-si.ctx.Done():
|
|
log.Warn("DataCoord context done, exit cleanupStatsTasksLoop...")
|
|
return
|
|
case <-ticker.C:
|
|
start := time.Now()
|
|
log.Info("start cleanupUnusedStatsTasks...", zap.Time("startAt", start))
|
|
|
|
taskIDs := si.mt.statsTaskMeta.CanCleanedTasks()
|
|
for _, taskID := range taskIDs {
|
|
if err := si.mt.statsTaskMeta.DropStatsTask(si.ctx, taskID); err != nil {
|
|
// ignore err, if remove failed, wait next GC
|
|
log.Warn("clean up stats task failed", zap.Int64("taskID", taskID), zap.Error(err))
|
|
}
|
|
}
|
|
log.Info("cleanupUnusedStatsTasks done", zap.Duration("timeCost", time.Since(start)))
|
|
}
|
|
}
|
|
}
|
|
|
|
func (si *statsInspector) SubmitStatsTask(originSegmentID, targetSegmentID int64,
|
|
subJobType indexpb.StatsSubJob, canRecycle bool,
|
|
) error {
|
|
originSegment := si.mt.GetHealthySegment(si.ctx, originSegmentID)
|
|
if originSegment == nil {
|
|
return merr.WrapErrSegmentNotFound(originSegmentID)
|
|
}
|
|
taskID, err := si.allocator.AllocID(context.Background())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
originSegmentSize := originSegment.getSegmentSize()
|
|
if subJobType == indexpb.StatsSubJob_JsonKeyIndexJob {
|
|
originSegmentSize = originSegment.getSegmentSize() * 2
|
|
}
|
|
|
|
taskSlot := calculateStatsTaskSlot(originSegmentSize)
|
|
t := &indexpb.StatsTask{
|
|
CollectionID: originSegment.GetCollectionID(),
|
|
PartitionID: originSegment.GetPartitionID(),
|
|
SegmentID: originSegmentID,
|
|
InsertChannel: originSegment.GetInsertChannel(),
|
|
TaskID: taskID,
|
|
Version: 0,
|
|
NodeID: 0,
|
|
State: indexpb.JobState_JobStateInit,
|
|
FailReason: "",
|
|
TargetSegmentID: targetSegmentID,
|
|
SubJobType: subJobType,
|
|
CanRecycle: canRecycle,
|
|
}
|
|
if err = si.mt.statsTaskMeta.AddStatsTask(t); err != nil {
|
|
if errors.Is(err, merr.ErrTaskDuplicate) {
|
|
log.RatedInfo(10, "stats task already exists", zap.Int64("taskID", taskID),
|
|
zap.Int64("collectionID", originSegment.GetCollectionID()),
|
|
zap.Int64("segmentID", originSegment.GetID()))
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
si.scheduler.Enqueue(newStatsTask(proto.Clone(t).(*indexpb.StatsTask), taskSlot, si.mt, si.handler, si.allocator, si.ievm))
|
|
log.Ctx(si.ctx).Info("submit stats task success", zap.Int64("taskID", taskID),
|
|
zap.String("subJobType", subJobType.String()),
|
|
zap.Int64("collectionID", originSegment.GetCollectionID()),
|
|
zap.Int64("originSegmentID", originSegmentID),
|
|
zap.Int64("targetSegmentID", targetSegmentID), zap.Int64("taskSlot", taskSlot))
|
|
return nil
|
|
}
|
|
|
|
func (si *statsInspector) GetStatsTask(originSegmentID int64, subJobType indexpb.StatsSubJob) *indexpb.StatsTask {
|
|
task := si.mt.statsTaskMeta.GetStatsTaskBySegmentID(originSegmentID, subJobType)
|
|
log.Info("statsJobManager get stats task state", zap.Int64("segmentID", originSegmentID),
|
|
zap.String("subJobType", subJobType.String()), zap.String("state", task.GetState().String()),
|
|
zap.String("failReason", task.GetFailReason()))
|
|
return task
|
|
}
|
|
|
|
func (si *statsInspector) DropStatsTask(originSegmentID int64, subJobType indexpb.StatsSubJob) error {
|
|
task := si.mt.statsTaskMeta.GetStatsTaskBySegmentID(originSegmentID, subJobType)
|
|
if task == nil {
|
|
return nil
|
|
}
|
|
si.scheduler.AbortAndRemoveTask(task.GetTaskID())
|
|
if err := si.mt.statsTaskMeta.MarkTaskCanRecycle(task.GetTaskID()); err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Info("statsJobManager drop stats task success", zap.Int64("segmentID", originSegmentID),
|
|
zap.Int64("taskID", task.GetTaskID()), zap.String("subJobType", subJobType.String()))
|
|
return nil
|
|
}
|