milvus/internal/datacoord/task_stats.go

427 lines
14 KiB
Go

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datacoord
import (
"context"
"fmt"
"time"
"github.com/cockroachdb/errors"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/workerpb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)
func (s *Server) startStatsTasksCheckLoop(ctx context.Context) {
s.serverLoopWg.Add(2)
go s.checkStatsTaskLoop(ctx)
go s.cleanupStatsTasksLoop(ctx)
}
func (s *Server) checkStatsTaskLoop(ctx context.Context) {
log.Info("start checkStatsTaskLoop...")
defer s.serverLoopWg.Done()
ticker := time.NewTicker(Params.DataCoordCfg.TaskCheckInterval.GetAsDuration(time.Second))
defer ticker.Stop()
for {
select {
case <-ctx.Done():
log.Warn("DataCoord context done, exit checkStatsTaskLoop...")
return
case <-ticker.C:
if Params.DataCoordCfg.EnableStatsTask.GetAsBool() {
segments := s.meta.SelectSegments(SegmentFilterFunc(func(seg *SegmentInfo) bool {
return isFlush(seg) && seg.GetLevel() != datapb.SegmentLevel_L0 && !seg.GetIsSorted() && !seg.isCompacting
}))
for _, segment := range segments {
if err := s.createStatsSegmentTask(segment); err != nil {
log.Warn("create stats task for segment failed, wait for retry",
zap.Int64("segmentID", segment.GetID()), zap.Error(err))
continue
}
}
}
case segID := <-s.statsCh:
log.Info("receive new flushed segment", zap.Int64("segmentID", segID))
segment := s.meta.GetSegment(segID)
if segment == nil {
log.Warn("segment is not exist, no need to do stats task", zap.Int64("segmentID", segID))
continue
}
// TODO @xiaocai2333: remove code after allow create stats task for importing segment
if segment.GetIsImporting() {
log.Info("segment is importing, skip stats task", zap.Int64("segmentID", segID))
select {
case s.buildIndexCh <- segID:
default:
}
continue
}
if err := s.createStatsSegmentTask(segment); err != nil {
log.Warn("create stats task for segment failed, wait for retry",
zap.Int64("segmentID", segment.ID), zap.Error(err))
continue
}
}
}
}
// cleanupStatsTasks clean up the finished/failed stats tasks
func (s *Server) cleanupStatsTasksLoop(ctx context.Context) {
log.Info("start cleanupStatsTasksLoop...")
defer s.serverLoopWg.Done()
ticker := time.NewTicker(Params.DataCoordCfg.GCInterval.GetAsDuration(time.Second))
defer ticker.Stop()
for {
select {
case <-ctx.Done():
log.Warn("DataCoord context done, exit cleanupStatsTasksLoop...")
return
case <-ticker.C:
start := time.Now()
log.Info("start cleanupUnusedStatsTasks...", zap.Time("startAt", start))
taskIDs := s.meta.statsTaskMeta.CanCleanedTasks()
for _, taskID := range taskIDs {
if err := s.meta.statsTaskMeta.RemoveStatsTaskByTaskID(taskID); err != nil {
// ignore err, if remove failed, wait next GC
log.Warn("clean up stats task failed", zap.Int64("taskID", taskID), zap.Error(err))
}
}
log.Info("recycleUnusedStatsTasks done", zap.Duration("timeCost", time.Since(start)))
}
}
}
func (s *Server) createStatsSegmentTask(segment *SegmentInfo) error {
if segment.GetIsSorted() || segment.GetIsImporting() {
// TODO @xiaocai2333: allow importing segment stats
log.Info("segment is sorted by segmentID", zap.Int64("segmentID", segment.GetID()))
return nil
}
start, _, err := s.allocator.AllocN(2)
if err != nil {
return err
}
t := &indexpb.StatsTask{
CollectionID: segment.GetCollectionID(),
PartitionID: segment.GetPartitionID(),
SegmentID: segment.GetID(),
InsertChannel: segment.GetInsertChannel(),
TaskID: start,
Version: 0,
NodeID: 0,
State: indexpb.JobState_JobStateInit,
FailReason: "",
TargetSegmentID: start + 1,
}
if err = s.meta.statsTaskMeta.AddStatsTask(t); err != nil {
if errors.Is(err, merr.ErrTaskDuplicate) {
return nil
}
return err
}
s.taskScheduler.enqueue(newStatsTask(t.GetTaskID(), t.GetSegmentID(), t.GetTargetSegmentID(), s.buildIndexCh))
return nil
}
type statsTask struct {
taskID int64
segmentID int64
targetSegmentID int64
nodeID int64
taskInfo *workerpb.StatsResult
queueTime time.Time
startTime time.Time
endTime time.Time
req *workerpb.CreateStatsRequest
buildIndexCh chan UniqueID
}
var _ Task = (*statsTask)(nil)
func newStatsTask(taskID int64, segmentID, targetSegmentID int64, buildIndexCh chan UniqueID) *statsTask {
return &statsTask{
taskID: taskID,
segmentID: segmentID,
targetSegmentID: targetSegmentID,
taskInfo: &workerpb.StatsResult{
TaskID: taskID,
State: indexpb.JobState_JobStateInit,
},
buildIndexCh: buildIndexCh,
}
}
func (st *statsTask) setResult(result *workerpb.StatsResult) {
st.taskInfo = result
}
func (st *statsTask) GetTaskID() int64 {
return st.taskID
}
func (st *statsTask) GetNodeID() int64 {
return st.nodeID
}
func (st *statsTask) ResetTask(mt *meta) {
st.nodeID = 0
// reset isCompacting
mt.SetSegmentsCompacting([]UniqueID{st.segmentID}, false)
}
func (st *statsTask) SetQueueTime(t time.Time) {
st.queueTime = t
}
func (st *statsTask) GetQueueTime() time.Time {
return st.queueTime
}
func (st *statsTask) SetStartTime(t time.Time) {
st.startTime = t
}
func (st *statsTask) GetStartTime() time.Time {
return st.startTime
}
func (st *statsTask) SetEndTime(t time.Time) {
st.endTime = t
}
func (st *statsTask) GetEndTime() time.Time {
return st.endTime
}
func (st *statsTask) GetTaskType() string {
return indexpb.JobType_JobTypeStatsJob.String()
}
func (st *statsTask) CheckTaskHealthy(mt *meta) bool {
seg := mt.GetHealthySegment(st.segmentID)
return seg != nil
}
func (st *statsTask) SetState(state indexpb.JobState, failReason string) {
st.taskInfo.State = state
st.taskInfo.FailReason = failReason
}
func (st *statsTask) GetState() indexpb.JobState {
return st.taskInfo.GetState()
}
func (st *statsTask) GetFailReason() string {
return st.taskInfo.GetFailReason()
}
func (st *statsTask) UpdateVersion(ctx context.Context, meta *meta) error {
// mark compacting
if exist, canDo := meta.CheckAndSetSegmentsCompacting([]UniqueID{st.segmentID}); !exist || !canDo {
log.Warn("segment is not exist or is compacting, skip stats",
zap.Bool("exist", exist), zap.Bool("canDo", canDo))
st.SetState(indexpb.JobState_JobStateNone, "segment is not healthy")
return fmt.Errorf("mark segment compacting failed, isCompacting: %v", !canDo)
}
return meta.statsTaskMeta.UpdateVersion(st.taskID)
}
func (st *statsTask) UpdateMetaBuildingState(nodeID int64, meta *meta) error {
st.nodeID = nodeID
return meta.statsTaskMeta.UpdateBuildingTask(st.taskID, nodeID)
}
func (st *statsTask) PreCheck(ctx context.Context, dependency *taskScheduler) bool {
// set segment compacting
log := log.Ctx(ctx).With(zap.Int64("taskID", st.taskID), zap.Int64("segmentID", st.segmentID))
segment := dependency.meta.GetHealthySegment(st.segmentID)
if segment == nil {
log.Warn("segment is node healthy, skip stats")
st.SetState(indexpb.JobState_JobStateNone, "segment is not healthy")
return false
}
if segment.GetIsSorted() {
log.Info("stats task is marked as sorted, skip stats")
st.SetState(indexpb.JobState_JobStateNone, "segment is marked as sorted")
return false
}
collInfo, err := dependency.handler.GetCollection(ctx, segment.GetCollectionID())
if err != nil {
log.Warn("stats task get collection info failed", zap.Int64("collectionID",
segment.GetCollectionID()), zap.Error(err))
st.SetState(indexpb.JobState_JobStateInit, err.Error())
return false
}
collTtl, err := getCollectionTTL(collInfo.Properties)
if err != nil {
log.Warn("stats task get collection ttl failed", zap.Int64("collectionID", segment.GetCollectionID()), zap.Error(err))
st.SetState(indexpb.JobState_JobStateInit, err.Error())
return false
}
start, end, err := dependency.allocator.AllocN(segment.getSegmentSize() / Params.DataNodeCfg.BinLogMaxSize.GetAsInt64() * int64(len(collInfo.Schema.GetFields())) * 2)
if err != nil {
log.Warn("stats task alloc logID failed", zap.Int64("collectionID", segment.GetCollectionID()), zap.Error(err))
st.SetState(indexpb.JobState_JobStateInit, err.Error())
return false
}
st.req = &workerpb.CreateStatsRequest{
ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(),
TaskID: st.GetTaskID(),
CollectionID: segment.GetCollectionID(),
PartitionID: segment.GetPartitionID(),
InsertChannel: segment.GetInsertChannel(),
SegmentID: segment.GetID(),
InsertLogs: segment.GetBinlogs(),
DeltaLogs: segment.GetDeltalogs(),
StorageConfig: createStorageConfig(),
Schema: collInfo.Schema,
TargetSegmentID: st.targetSegmentID,
StartLogID: start,
EndLogID: end,
NumRows: segment.GetNumOfRows(),
CollectionTtl: collTtl.Nanoseconds(),
CurrentTs: tsoutil.GetCurrentTime(),
BinlogMaxSize: Params.DataNodeCfg.BinLogMaxSize.GetAsUint64(),
}
return true
}
func (st *statsTask) AssignTask(ctx context.Context, client types.IndexNodeClient) bool {
ctx, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
defer cancel()
resp, err := client.CreateJobV2(ctx, &workerpb.CreateJobV2Request{
ClusterID: st.req.GetClusterID(),
TaskID: st.req.GetTaskID(),
JobType: indexpb.JobType_JobTypeStatsJob,
Request: &workerpb.CreateJobV2Request_StatsRequest{
StatsRequest: st.req,
},
})
if err := merr.CheckRPCCall(resp, err); err != nil {
log.Ctx(ctx).Warn("assign stats task failed", zap.Int64("taskID", st.taskID),
zap.Int64("segmentID", st.segmentID), zap.Error(err))
st.SetState(indexpb.JobState_JobStateRetry, err.Error())
return false
}
log.Ctx(ctx).Info("assign stats task success", zap.Int64("taskID", st.taskID), zap.Int64("segmentID", st.segmentID))
st.SetState(indexpb.JobState_JobStateInProgress, "")
return true
}
func (st *statsTask) QueryResult(ctx context.Context, client types.IndexNodeClient) {
resp, err := client.QueryJobsV2(ctx, &workerpb.QueryJobsV2Request{
ClusterID: st.req.GetClusterID(),
TaskIDs: []int64{st.GetTaskID()},
JobType: indexpb.JobType_JobTypeStatsJob,
})
if err := merr.CheckRPCCall(resp, err); err != nil {
log.Ctx(ctx).Warn("query stats task result failed", zap.Int64("taskID", st.GetTaskID()),
zap.Int64("segmentID", st.segmentID), zap.Error(err))
st.SetState(indexpb.JobState_JobStateRetry, err.Error())
return
}
for _, result := range resp.GetStatsJobResults().GetResults() {
if result.GetTaskID() == st.GetTaskID() {
log.Ctx(ctx).Info("query stats task result success", zap.Int64("taskID", st.GetTaskID()),
zap.Int64("segmentID", st.segmentID), zap.String("result state", result.GetState().String()),
zap.String("failReason", result.GetFailReason()))
if result.GetState() == indexpb.JobState_JobStateFinished || result.GetState() == indexpb.JobState_JobStateRetry ||
result.GetState() == indexpb.JobState_JobStateFailed {
st.setResult(result)
} else if result.GetState() == indexpb.JobState_JobStateNone {
st.SetState(indexpb.JobState_JobStateRetry, "stats task state is none in info response")
}
// inProgress or unissued/init, keep InProgress state
return
}
}
log.Ctx(ctx).Warn("query stats task result failed, indexNode does not have task info",
zap.Int64("taskID", st.GetTaskID()), zap.Int64("segmentID", st.segmentID))
st.SetState(indexpb.JobState_JobStateRetry, "stats task is not in info response")
}
func (st *statsTask) DropTaskOnWorker(ctx context.Context, client types.IndexNodeClient) bool {
resp, err := client.DropJobsV2(ctx, &workerpb.DropJobsV2Request{
ClusterID: st.req.GetClusterID(),
TaskIDs: []int64{st.GetTaskID()},
JobType: indexpb.JobType_JobTypeStatsJob,
})
if err := merr.CheckRPCCall(resp, err); err != nil {
log.Ctx(ctx).Warn("notify worker drop the stats task failed", zap.Int64("taskID", st.GetTaskID()),
zap.Int64("segmentID", st.segmentID), zap.Error(err))
return false
}
log.Ctx(ctx).Info("drop stats task success", zap.Int64("taskID", st.GetTaskID()),
zap.Int64("segmentID", st.segmentID))
return true
}
func (st *statsTask) SetJobInfo(meta *meta) error {
// first update segment
metricMutation, err := meta.SaveStatsResultSegment(st.segmentID, st.taskInfo)
if err != nil {
log.Warn("save stats result failed", zap.Int64("taskID", st.taskID),
zap.Int64("segmentID", st.segmentID), zap.Error(err))
return err
}
// second update the task meta
if err = meta.statsTaskMeta.FinishTask(st.taskID, st.taskInfo); err != nil {
log.Warn("save stats result failed", zap.Int64("taskID", st.taskID), zap.Error(err))
return err
}
metricMutation.commit()
log.Info("SetJobInfo for stats task success", zap.Int64("taskID", st.taskID),
zap.Int64("oldSegmentID", st.segmentID), zap.Int64("targetSegmentID", st.taskInfo.GetSegmentID()))
if st.buildIndexCh != nil {
select {
case st.buildIndexCh <- st.taskInfo.GetSegmentID():
default:
}
}
return nil
}