2024-06-10 13:34:08 +00:00
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datacoord
import (
"context"
"fmt"
"path"
2024-09-28 09:29:21 +00:00
"strconv"
2024-06-10 13:34:08 +00:00
"time"
2024-09-28 09:29:21 +00:00
"github.com/milvus-io/milvus/internal/storage"
2024-06-10 13:34:08 +00:00
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
2024-08-22 02:06:56 +00:00
"github.com/milvus-io/milvus/internal/datacoord/allocator"
2024-08-22 08:02:56 +00:00
"github.com/milvus-io/milvus/internal/datacoord/session"
2024-06-10 13:34:08 +00:00
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil"
2024-09-26 12:19:14 +00:00
"github.com/milvus-io/milvus/pkg/util/paramtable"
2024-06-10 13:34:08 +00:00
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
var _ CompactionTask = ( * clusteringCompactionTask ) ( nil )
type clusteringCompactionTask struct {
* datapb . CompactionTask
2024-07-03 11:08:09 +00:00
plan * datapb . CompactionPlan
result * datapb . CompactionPlanResult
2024-06-10 13:34:08 +00:00
2024-07-17 05:23:42 +00:00
span trace . Span
2024-08-22 02:06:56 +00:00
allocator allocator . Allocator
2024-06-10 13:34:08 +00:00
meta CompactionMeta
2024-08-22 08:02:56 +00:00
sessions session . DataNodeManager
2024-06-10 13:34:08 +00:00
handler Handler
analyzeScheduler * taskScheduler
2024-07-30 12:37:57 +00:00
maxRetryTimes int32
2024-09-26 12:19:14 +00:00
slotUsage int64
2024-07-30 12:37:57 +00:00
}
2024-08-22 08:02:56 +00:00
func newClusteringCompactionTask ( t * datapb . CompactionTask , allocator allocator . Allocator , meta CompactionMeta , session session . DataNodeManager , handler Handler , analyzeScheduler * taskScheduler ) * clusteringCompactionTask {
2024-07-30 12:37:57 +00:00
return & clusteringCompactionTask {
CompactionTask : t ,
allocator : allocator ,
meta : meta ,
sessions : session ,
handler : handler ,
analyzeScheduler : analyzeScheduler ,
maxRetryTimes : 3 ,
2024-09-26 12:19:14 +00:00
slotUsage : paramtable . Get ( ) . DataCoordCfg . ClusteringCompactionSlotUsage . GetAsInt64 ( ) ,
2024-07-30 12:37:57 +00:00
}
2024-06-10 13:34:08 +00:00
}
func ( t * clusteringCompactionTask ) Process ( ) bool {
log := log . With ( zap . Int64 ( "triggerID" , t . GetTriggerID ( ) ) , zap . Int64 ( "PlanID" , t . GetPlanID ( ) ) , zap . Int64 ( "collectionID" , t . GetCollectionID ( ) ) )
lastState := t . GetState ( ) . String ( )
err := t . retryableProcess ( )
if err != nil {
log . Warn ( "fail in process task" , zap . Error ( err ) )
2024-07-30 12:37:57 +00:00
if merr . IsRetryableErr ( err ) && t . RetryTimes < t . maxRetryTimes {
2024-06-10 13:34:08 +00:00
// retry in next Process
2024-07-30 12:37:57 +00:00
err = t . updateAndSaveTaskMeta ( setRetryTimes ( t . RetryTimes + 1 ) )
2024-06-10 13:34:08 +00:00
} else {
log . Error ( "task fail with unretryable reason or meet max retry times" , zap . Error ( err ) )
2024-07-30 12:37:57 +00:00
err = t . updateAndSaveTaskMeta ( setState ( datapb . CompactionTaskState_failed ) , setFailReason ( err . Error ( ) ) )
}
if err != nil {
log . Warn ( "Failed to updateAndSaveTaskMeta" , zap . Error ( err ) )
2024-06-10 13:34:08 +00:00
}
}
// task state update, refresh retry times count
currentState := t . State . String ( )
if currentState != lastState {
2024-08-02 02:30:19 +00:00
ts := time . Now ( ) . Unix ( )
2024-07-03 11:08:09 +00:00
lastStateDuration := ts - t . GetLastStateStartTime ( )
2024-08-02 02:30:19 +00:00
log . Info ( "clustering compaction task state changed" , zap . String ( "lastState" , lastState ) , zap . String ( "currentState" , currentState ) , zap . Int64 ( "elapse seconds" , lastStateDuration ) )
2024-06-10 13:34:08 +00:00
metrics . DataCoordCompactionLatency .
2024-07-11 09:43:43 +00:00
WithLabelValues ( fmt . Sprint ( typeutil . IsVectorType ( t . GetClusteringKeyField ( ) . DataType ) ) , fmt . Sprint ( t . CollectionID ) , t . Channel , datapb . CompactionType_ClusteringCompaction . String ( ) , lastState ) .
2024-08-02 02:30:19 +00:00
Observe ( float64 ( lastStateDuration * 1000 ) )
2024-07-30 12:37:57 +00:00
updateOps := [ ] compactionTaskOpt { setRetryTimes ( 0 ) , setLastStateStartTime ( ts ) }
2024-06-10 13:34:08 +00:00
2024-08-02 02:30:19 +00:00
if t . State == datapb . CompactionTaskState_completed || t . State == datapb . CompactionTaskState_cleaned {
2024-07-30 12:37:57 +00:00
updateOps = append ( updateOps , setEndTime ( ts ) )
2024-07-11 09:43:43 +00:00
elapse := ts - t . StartTime
2024-08-02 02:30:19 +00:00
log . Info ( "clustering compaction task total elapse" , zap . Int64 ( "elapse seconds" , elapse ) )
2024-06-10 13:34:08 +00:00
metrics . DataCoordCompactionLatency .
2024-07-11 09:43:43 +00:00
WithLabelValues ( fmt . Sprint ( typeutil . IsVectorType ( t . GetClusteringKeyField ( ) . DataType ) ) , fmt . Sprint ( t . CollectionID ) , t . Channel , datapb . CompactionType_ClusteringCompaction . String ( ) , "total" ) .
2024-08-02 02:30:19 +00:00
Observe ( float64 ( elapse * 1000 ) )
2024-06-10 13:34:08 +00:00
}
2024-07-30 12:37:57 +00:00
err = t . updateAndSaveTaskMeta ( updateOps ... )
if err != nil {
log . Warn ( "Failed to updateAndSaveTaskMeta" , zap . Error ( err ) )
}
2024-06-10 13:34:08 +00:00
}
2024-06-30 12:22:09 +00:00
log . Debug ( "process clustering task" , zap . String ( "lastState" , lastState ) , zap . String ( "currentState" , currentState ) )
2024-06-10 13:34:08 +00:00
return t . State == datapb . CompactionTaskState_completed || t . State == datapb . CompactionTaskState_cleaned
}
// retryableProcess process task's state transfer, return error if not work as expected
// the outer Process will set state and retry times according to the error type(retryable or not-retryable)
func ( t * clusteringCompactionTask ) retryableProcess ( ) error {
if t . State == datapb . CompactionTaskState_completed || t . State == datapb . CompactionTaskState_cleaned {
return nil
}
coll , err := t . handler . GetCollection ( context . Background ( ) , t . GetCollectionID ( ) )
if err != nil {
// retryable
log . Warn ( "fail to get collection" , zap . Int64 ( "collectionID" , t . GetCollectionID ( ) ) , zap . Error ( err ) )
return merr . WrapErrClusteringCompactionGetCollectionFail ( t . GetCollectionID ( ) , err )
}
if coll == nil {
// not-retryable fail fast if collection is dropped
log . Warn ( "collection not found, it may be dropped, stop clustering compaction task" , zap . Int64 ( "collectionID" , t . GetCollectionID ( ) ) )
return merr . WrapErrCollectionNotFound ( t . GetCollectionID ( ) )
}
switch t . State {
case datapb . CompactionTaskState_pipelining :
return t . processPipelining ( )
case datapb . CompactionTaskState_executing :
return t . processExecuting ( )
case datapb . CompactionTaskState_analyzing :
return t . processAnalyzing ( )
case datapb . CompactionTaskState_meta_saved :
return t . processMetaSaved ( )
case datapb . CompactionTaskState_indexing :
return t . processIndexing ( )
2024-09-02 06:19:03 +00:00
case datapb . CompactionTaskState_statistic :
return t . processStats ( )
2024-06-10 13:34:08 +00:00
case datapb . CompactionTaskState_timeout :
return t . processFailedOrTimeout ( )
case datapb . CompactionTaskState_failed :
return t . processFailedOrTimeout ( )
}
return nil
}
func ( t * clusteringCompactionTask ) BuildCompactionRequest ( ) ( * datapb . CompactionPlan , error ) {
2024-08-22 02:06:56 +00:00
beginLogID , _ , err := t . allocator . AllocN ( 1 )
2024-07-17 05:23:42 +00:00
if err != nil {
return nil , err
}
2024-06-10 13:34:08 +00:00
plan := & datapb . CompactionPlan {
PlanID : t . GetPlanID ( ) ,
StartTime : t . GetStartTime ( ) ,
TimeoutInSeconds : t . GetTimeoutInSeconds ( ) ,
Type : t . GetType ( ) ,
Channel : t . GetChannel ( ) ,
CollectionTtl : t . GetCollectionTtl ( ) ,
TotalRows : t . GetTotalRows ( ) ,
Schema : t . GetSchema ( ) ,
ClusteringKeyField : t . GetClusteringKeyField ( ) . GetFieldID ( ) ,
MaxSegmentRows : t . GetMaxSegmentRows ( ) ,
PreferSegmentRows : t . GetPreferSegmentRows ( ) ,
AnalyzeResultPath : path . Join ( t . meta . ( * meta ) . chunkManager . RootPath ( ) , common . AnalyzeStatsPath , metautil . JoinIDPath ( t . AnalyzeTaskID , t . AnalyzeVersion ) ) ,
2024-07-18 02:27:41 +00:00
AnalyzeSegmentIds : t . GetInputSegments ( ) ,
2024-07-17 05:23:42 +00:00
BeginLogID : beginLogID ,
2024-09-01 09:09:01 +00:00
PreAllocatedSegmentIDs : & datapb . IDRange {
2024-07-17 05:23:42 +00:00
Begin : t . GetResultSegments ( ) [ 0 ] ,
End : t . GetResultSegments ( ) [ 1 ] ,
} ,
2024-09-26 12:19:14 +00:00
SlotUsage : t . GetSlotUsage ( ) ,
2024-06-10 13:34:08 +00:00
}
log := log . With ( zap . Int64 ( "taskID" , t . GetTriggerID ( ) ) , zap . Int64 ( "planID" , plan . GetPlanID ( ) ) )
for _ , segID := range t . GetInputSegments ( ) {
segInfo := t . meta . GetHealthySegment ( segID )
if segInfo == nil {
return nil , merr . WrapErrSegmentNotFound ( segID )
}
plan . SegmentBinlogs = append ( plan . SegmentBinlogs , & datapb . CompactionSegmentBinlogs {
SegmentID : segID ,
CollectionID : segInfo . GetCollectionID ( ) ,
PartitionID : segInfo . GetPartitionID ( ) ,
Level : segInfo . GetLevel ( ) ,
InsertChannel : segInfo . GetInsertChannel ( ) ,
FieldBinlogs : segInfo . GetBinlogs ( ) ,
Field2StatslogPaths : segInfo . GetStatslogs ( ) ,
Deltalogs : segInfo . GetDeltalogs ( ) ,
2024-09-02 06:19:03 +00:00
IsSorted : segInfo . GetIsSorted ( ) ,
2024-06-10 13:34:08 +00:00
} )
}
log . Info ( "Compaction handler build clustering compaction plan" )
return plan , nil
}
func ( t * clusteringCompactionTask ) processPipelining ( ) error {
log := log . With ( zap . Int64 ( "triggerID" , t . TriggerID ) , zap . Int64 ( "collectionID" , t . GetCollectionID ( ) ) , zap . Int64 ( "planID" , t . GetPlanID ( ) ) )
2024-08-01 03:18:12 +00:00
if t . NeedReAssignNodeID ( ) {
log . Debug ( "wait for the node to be assigned before proceeding with the subsequent steps" )
return nil
}
2024-06-10 13:34:08 +00:00
var operators [ ] UpdateOperator
for _ , segID := range t . InputSegments {
operators = append ( operators , UpdateSegmentLevelOperator ( segID , datapb . SegmentLevel_L2 ) )
}
err := t . meta . UpdateSegmentsInfo ( operators ... )
if err != nil {
log . Warn ( "fail to set segment level to L2" , zap . Error ( err ) )
2024-06-30 12:22:09 +00:00
return merr . WrapErrClusteringCompactionMetaError ( "UpdateSegmentsInfo before compaction executing" , err )
2024-06-10 13:34:08 +00:00
}
if typeutil . IsVectorType ( t . GetClusteringKeyField ( ) . DataType ) {
err := t . doAnalyze ( )
if err != nil {
log . Warn ( "fail to submit analyze task" , zap . Error ( err ) )
return merr . WrapErrClusteringCompactionSubmitTaskFail ( "analyze" , err )
}
} else {
err := t . doCompact ( )
if err != nil {
log . Warn ( "fail to submit compaction task" , zap . Error ( err ) )
return merr . WrapErrClusteringCompactionSubmitTaskFail ( "compact" , err )
}
}
return nil
}
func ( t * clusteringCompactionTask ) processExecuting ( ) error {
log := log . With ( zap . Int64 ( "planID" , t . GetPlanID ( ) ) , zap . String ( "type" , t . GetType ( ) . String ( ) ) )
result , err := t . sessions . GetCompactionPlanResult ( t . GetNodeID ( ) , t . GetPlanID ( ) )
if err != nil || result == nil {
if errors . Is ( err , merr . ErrNodeNotFound ) {
log . Warn ( "GetCompactionPlanResult fail" , zap . Error ( err ) )
2024-06-30 12:22:09 +00:00
// setNodeID(NullNodeID) to trigger reassign node ID
2024-07-30 12:37:57 +00:00
return t . updateAndSaveTaskMeta ( setState ( datapb . CompactionTaskState_pipelining ) , setNodeID ( NullNodeID ) )
2024-06-10 13:34:08 +00:00
}
return err
}
log . Info ( "compaction result" , zap . Any ( "result" , result . String ( ) ) )
switch result . GetState ( ) {
case datapb . CompactionTaskState_completed :
t . result = result
result := t . result
if len ( result . GetSegments ( ) ) == 0 {
2024-06-30 12:22:09 +00:00
log . Warn ( "illegal compaction results, this should not happen" )
return merr . WrapErrCompactionResult ( "compaction result is empty" )
2024-06-10 13:34:08 +00:00
}
resultSegmentIDs := lo . Map ( result . Segments , func ( segment * datapb . CompactionSegment , _ int ) int64 {
return segment . GetSegmentID ( )
} )
2024-06-25 02:08:03 +00:00
_ , metricMutation , err := t . meta . CompleteCompactionMutation ( t . CompactionTask , t . result )
2024-06-10 13:34:08 +00:00
if err != nil {
return err
}
metricMutation . commit ( )
2024-09-02 06:19:03 +00:00
err = t . updateAndSaveTaskMeta ( setState ( datapb . CompactionTaskState_meta_saved ) , setTmpSegments ( resultSegmentIDs ) )
2024-06-10 13:34:08 +00:00
if err != nil {
return err
}
return t . processMetaSaved ( )
case datapb . CompactionTaskState_executing :
if t . checkTimeout ( ) {
err := t . updateAndSaveTaskMeta ( setState ( datapb . CompactionTaskState_timeout ) )
if err == nil {
return t . processFailedOrTimeout ( )
2024-06-30 12:22:09 +00:00
} else {
return err
2024-06-10 13:34:08 +00:00
}
}
return nil
case datapb . CompactionTaskState_failed :
return t . updateAndSaveTaskMeta ( setState ( datapb . CompactionTaskState_failed ) )
2024-07-30 12:37:57 +00:00
default :
log . Error ( "not support compaction task state" , zap . String ( "state" , result . GetState ( ) . String ( ) ) )
return t . updateAndSaveTaskMeta ( setState ( datapb . CompactionTaskState_failed ) )
2024-06-10 13:34:08 +00:00
}
}
func ( t * clusteringCompactionTask ) processMetaSaved ( ) error {
2024-09-02 06:19:03 +00:00
return t . updateAndSaveTaskMeta ( setState ( datapb . CompactionTaskState_statistic ) )
}
func ( t * clusteringCompactionTask ) processStats ( ) error {
// just the memory step, if it crashes at this step, the state after recovery is CompactionTaskState_statistic.
resultSegments := make ( [ ] int64 , 0 , len ( t . GetTmpSegments ( ) ) )
2024-09-28 09:29:21 +00:00
if Params . DataCoordCfg . EnableStatsTask . GetAsBool ( ) {
existNonStats := false
tmpToResultSegments := make ( map [ int64 ] [ ] int64 , len ( t . GetTmpSegments ( ) ) )
for _ , segmentID := range t . GetTmpSegments ( ) {
to , ok := t . meta . ( * meta ) . GetCompactionTo ( segmentID )
if ! ok || to == nil {
select {
case getStatsTaskChSingleton ( ) <- segmentID :
default :
}
existNonStats = true
continue
}
tmpToResultSegments [ segmentID ] = lo . Map ( to , func ( segment * SegmentInfo , _ int ) int64 { return segment . GetID ( ) } )
resultSegments = append ( resultSegments , lo . Map ( to , func ( segment * SegmentInfo , _ int ) int64 { return segment . GetID ( ) } ) ... )
}
if existNonStats {
2024-09-02 06:19:03 +00:00
return nil
}
2024-09-28 09:29:21 +00:00
if err := t . regeneratePartitionStats ( tmpToResultSegments ) ; err != nil {
log . Warn ( "regenerate partition stats failed, wait for retry" , zap . Int64 ( "planID" , t . GetPlanID ( ) ) , zap . Error ( err ) )
return merr . WrapErrClusteringCompactionMetaError ( "regeneratePartitionStats" , err )
}
} else {
log . Info ( "stats task is not enable, set tmp segments to result segments" , zap . Int64 ( "planID" , t . GetPlanID ( ) ) )
resultSegments = t . GetTmpSegments ( )
2024-09-02 06:19:03 +00:00
}
2024-09-28 09:29:21 +00:00
log . Info ( "clustering compaction stats task finished" , zap . Int64 ( "planID" , t . GetPlanID ( ) ) ,
2024-09-02 06:19:03 +00:00
zap . Int64s ( "tmp segments" , t . GetTmpSegments ( ) ) ,
zap . Int64s ( "result segments" , resultSegments ) )
return t . updateAndSaveTaskMeta ( setState ( datapb . CompactionTaskState_indexing ) , setResultSegments ( resultSegments ) )
2024-06-10 13:34:08 +00:00
}
2024-09-28 09:29:21 +00:00
// this is just a temporary solution. A more long-term solution should be for the indexnode
// to regenerate the clustering information corresponding to each segment and merge them at the vshard level.
func ( t * clusteringCompactionTask ) regeneratePartitionStats ( tmpToResultSegments map [ int64 ] [ ] int64 ) error {
ctx , cancel := context . WithTimeout ( context . Background ( ) , time . Minute )
defer cancel ( )
chunkManagerFactory := storage . NewChunkManagerFactoryWithParam ( Params )
cli , err := chunkManagerFactory . NewPersistentStorageChunkManager ( ctx )
if err != nil {
log . Error ( "chunk manager init failed" , zap . Error ( err ) )
return err
}
partitionStatsFile := path . Join ( cli . RootPath ( ) , common . PartitionStatsPath ,
metautil . JoinIDPath ( t . GetCollectionID ( ) , t . GetPartitionID ( ) ) , t . plan . GetChannel ( ) ,
strconv . FormatInt ( t . GetPlanID ( ) , 10 ) )
value , err := cli . Read ( ctx , partitionStatsFile )
if err != nil {
log . Warn ( "read partition stats file failed" , zap . Int64 ( "planID" , t . GetPlanID ( ) ) , zap . Error ( err ) )
return err
}
partitionStats , err := storage . DeserializePartitionsStatsSnapshot ( value )
if err != nil {
log . Warn ( "deserialize partition stats failed" , zap . Int64 ( "planID" , t . GetPlanID ( ) ) , zap . Error ( err ) )
return err
}
for from , to := range tmpToResultSegments {
stats := partitionStats . SegmentStats [ from ]
// stats task only one to
for _ , toID := range to {
partitionStats . SegmentStats [ toID ] = stats
}
delete ( partitionStats . SegmentStats , from )
}
partitionStatsBytes , err := storage . SerializePartitionStatsSnapshot ( partitionStats )
if err != nil {
log . Warn ( "serialize partition stats failed" , zap . Int64 ( "planID" , t . GetPlanID ( ) ) , zap . Error ( err ) )
return err
}
err = cli . Write ( ctx , partitionStatsFile , partitionStatsBytes )
if err != nil {
log . Warn ( "save partition stats file failed" , zap . Int64 ( "planID" , t . GetPlanID ( ) ) ,
zap . String ( "path" , partitionStatsFile ) , zap . Error ( err ) )
return err
}
return nil
}
2024-06-10 13:34:08 +00:00
func ( t * clusteringCompactionTask ) processIndexing ( ) error {
// wait for segment indexed
collectionIndexes := t . meta . GetIndexMeta ( ) . GetIndexesForCollection ( t . GetCollectionID ( ) , "" )
2024-08-07 06:44:21 +00:00
if len ( collectionIndexes ) == 0 {
log . Debug ( "the collection has no index, no need to do indexing" )
return t . completeTask ( )
}
2024-06-10 13:34:08 +00:00
indexed := func ( ) bool {
for _ , collectionIndex := range collectionIndexes {
2024-09-02 06:19:03 +00:00
for _ , segmentID := range t . GetResultSegments ( ) {
2024-06-10 13:34:08 +00:00
segmentIndexState := t . meta . GetIndexMeta ( ) . GetSegmentIndexState ( t . GetCollectionID ( ) , segmentID , collectionIndex . IndexID )
2024-08-07 06:44:21 +00:00
log . Debug ( "segment index state" , zap . String ( "segment" , segmentIndexState . String ( ) ) )
2024-06-10 13:34:08 +00:00
if segmentIndexState . GetState ( ) != commonpb . IndexState_Finished {
return false
}
}
}
return true
} ( )
log . Debug ( "check compaction result segments index states" , zap . Bool ( "indexed" , indexed ) , zap . Int64 ( "planID" , t . GetPlanID ( ) ) , zap . Int64s ( "segments" , t . ResultSegments ) )
if indexed {
2024-08-07 06:44:21 +00:00
return t . completeTask ( )
2024-06-10 13:34:08 +00:00
}
return nil
}
// indexed is the final state of a clustering compaction task
// one task should only run this once
func ( t * clusteringCompactionTask ) completeTask ( ) error {
err := t . meta . GetPartitionStatsMeta ( ) . SavePartitionStatsInfo ( & datapb . PartitionStatsInfo {
CollectionID : t . GetCollectionID ( ) ,
PartitionID : t . GetPartitionID ( ) ,
VChannel : t . GetChannel ( ) ,
Version : t . GetPlanID ( ) ,
SegmentIDs : t . GetResultSegments ( ) ,
2024-08-02 08:16:14 +00:00
CommitTime : time . Now ( ) . Unix ( ) ,
2024-06-10 13:34:08 +00:00
} )
if err != nil {
return merr . WrapErrClusteringCompactionMetaError ( "SavePartitionStatsInfo" , err )
}
2024-06-30 12:22:09 +00:00
2024-06-10 13:34:08 +00:00
var operators [ ] UpdateOperator
for _ , segID := range t . GetResultSegments ( ) {
operators = append ( operators , UpdateSegmentPartitionStatsVersionOperator ( segID , t . GetPlanID ( ) ) )
}
err = t . meta . UpdateSegmentsInfo ( operators ... )
if err != nil {
return merr . WrapErrClusteringCompactionMetaError ( "UpdateSegmentPartitionStatsVersion" , err )
}
err = t . meta . GetPartitionStatsMeta ( ) . SaveCurrentPartitionStatsVersion ( t . GetCollectionID ( ) , t . GetPartitionID ( ) , t . GetChannel ( ) , t . GetPlanID ( ) )
if err != nil {
2024-06-30 12:22:09 +00:00
return merr . WrapErrClusteringCompactionMetaError ( "SaveCurrentPartitionStatsVersion" , err )
2024-06-10 13:34:08 +00:00
}
return t . updateAndSaveTaskMeta ( setState ( datapb . CompactionTaskState_completed ) )
}
func ( t * clusteringCompactionTask ) processAnalyzing ( ) error {
analyzeTask := t . meta . GetAnalyzeMeta ( ) . GetTask ( t . GetAnalyzeTaskID ( ) )
if analyzeTask == nil {
log . Warn ( "analyzeTask not found" , zap . Int64 ( "id" , t . GetAnalyzeTaskID ( ) ) )
2024-06-30 12:22:09 +00:00
return merr . WrapErrAnalyzeTaskNotFound ( t . GetAnalyzeTaskID ( ) ) // retryable
2024-06-10 13:34:08 +00:00
}
log . Info ( "check analyze task state" , zap . Int64 ( "id" , t . GetAnalyzeTaskID ( ) ) , zap . Int64 ( "version" , analyzeTask . GetVersion ( ) ) , zap . String ( "state" , analyzeTask . State . String ( ) ) )
switch analyzeTask . State {
case indexpb . JobState_JobStateFinished :
if analyzeTask . GetCentroidsFile ( ) == "" {
2024-06-30 12:22:09 +00:00
// not retryable, fake finished vector clustering is not supported in opensource
2024-06-10 13:34:08 +00:00
return merr . WrapErrClusteringCompactionNotSupportVector ( )
} else {
t . AnalyzeVersion = analyzeTask . GetVersion ( )
return t . doCompact ( )
}
case indexpb . JobState_JobStateFailed :
log . Warn ( "analyze task fail" , zap . Int64 ( "analyzeID" , t . GetAnalyzeTaskID ( ) ) )
return errors . New ( analyzeTask . FailReason )
default :
}
return nil
}
func ( t * clusteringCompactionTask ) resetSegmentCompacting ( ) {
2024-07-11 09:45:37 +00:00
t . meta . SetSegmentsCompacting ( t . GetInputSegments ( ) , false )
2024-06-10 13:34:08 +00:00
}
func ( t * clusteringCompactionTask ) processFailedOrTimeout ( ) error {
log . Info ( "clean task" , zap . Int64 ( "triggerID" , t . GetTriggerID ( ) ) , zap . Int64 ( "planID" , t . GetPlanID ( ) ) , zap . String ( "state" , t . GetState ( ) . String ( ) ) )
2024-07-14 14:29:38 +00:00
// revert segments meta
2024-06-10 13:34:08 +00:00
var operators [ ] UpdateOperator
2024-07-14 14:29:38 +00:00
// revert level of input segments
// L1 : L1 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L1
// L2 : L2 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L2
2024-06-10 13:34:08 +00:00
for _ , segID := range t . InputSegments {
operators = append ( operators , RevertSegmentLevelOperator ( segID ) )
2024-07-14 14:29:38 +00:00
}
// if result segments are generated but task fail in the other steps, mark them as L1 segments without partitions stats
for _ , segID := range t . ResultSegments {
operators = append ( operators , UpdateSegmentLevelOperator ( segID , datapb . SegmentLevel_L1 ) )
operators = append ( operators , UpdateSegmentPartitionStatsVersionOperator ( segID , 0 ) )
2024-06-10 13:34:08 +00:00
}
err := t . meta . UpdateSegmentsInfo ( operators ... )
if err != nil {
log . Warn ( "UpdateSegmentsInfo fail" , zap . Error ( err ) )
2024-06-30 12:22:09 +00:00
return merr . WrapErrClusteringCompactionMetaError ( "UpdateSegmentsInfo" , err )
2024-06-10 13:34:08 +00:00
}
t . resetSegmentCompacting ( )
// drop partition stats if uploaded
partitionStatsInfo := & datapb . PartitionStatsInfo {
CollectionID : t . GetCollectionID ( ) ,
PartitionID : t . GetPartitionID ( ) ,
VChannel : t . GetChannel ( ) ,
Version : t . GetPlanID ( ) ,
SegmentIDs : t . GetResultSegments ( ) ,
}
err = t . meta . CleanPartitionStatsInfo ( partitionStatsInfo )
if err != nil {
log . Warn ( "gcPartitionStatsInfo fail" , zap . Error ( err ) )
}
2024-07-30 12:37:57 +00:00
return t . updateAndSaveTaskMeta ( setState ( datapb . CompactionTaskState_cleaned ) )
2024-06-10 13:34:08 +00:00
}
func ( t * clusteringCompactionTask ) doAnalyze ( ) error {
2024-09-02 06:19:03 +00:00
analyzeTask := & indexpb . AnalyzeTask {
2024-06-10 13:34:08 +00:00
CollectionID : t . GetCollectionID ( ) ,
PartitionID : t . GetPartitionID ( ) ,
FieldID : t . GetClusteringKeyField ( ) . FieldID ,
FieldName : t . GetClusteringKeyField ( ) . Name ,
FieldType : t . GetClusteringKeyField ( ) . DataType ,
SegmentIDs : t . GetInputSegments ( ) ,
TaskID : t . GetAnalyzeTaskID ( ) ,
State : indexpb . JobState_JobStateInit ,
}
2024-09-02 06:19:03 +00:00
err := t . meta . GetAnalyzeMeta ( ) . AddAnalyzeTask ( analyzeTask )
2024-06-10 13:34:08 +00:00
if err != nil {
log . Warn ( "failed to create analyze task" , zap . Int64 ( "planID" , t . GetPlanID ( ) ) , zap . Error ( err ) )
return err
}
2024-09-02 06:19:03 +00:00
t . analyzeScheduler . enqueue ( newAnalyzeTask ( t . GetAnalyzeTaskID ( ) ) )
2024-06-10 13:34:08 +00:00
log . Info ( "submit analyze task" , zap . Int64 ( "planID" , t . GetPlanID ( ) ) , zap . Int64 ( "triggerID" , t . GetTriggerID ( ) ) , zap . Int64 ( "collectionID" , t . GetCollectionID ( ) ) , zap . Int64 ( "id" , t . GetAnalyzeTaskID ( ) ) )
2024-07-30 12:37:57 +00:00
return t . updateAndSaveTaskMeta ( setState ( datapb . CompactionTaskState_analyzing ) )
2024-06-10 13:34:08 +00:00
}
func ( t * clusteringCompactionTask ) doCompact ( ) error {
2024-06-30 12:22:09 +00:00
log := log . With ( zap . Int64 ( "planID" , t . GetPlanID ( ) ) , zap . String ( "type" , t . GetType ( ) . String ( ) ) )
2024-06-10 13:34:08 +00:00
if t . NeedReAssignNodeID ( ) {
2024-07-18 02:27:41 +00:00
log . RatedWarn ( 10 , "not assign nodeID" )
return nil
2024-06-10 13:34:08 +00:00
}
2024-07-18 02:27:41 +00:00
log = log . With ( zap . Int64 ( "nodeID" , t . GetNodeID ( ) ) )
2024-07-03 02:32:08 +00:00
// todo refine this logic: GetCompactionPlanResult return a fail result when this is no compaction in datanode which is weird
2024-06-30 12:22:09 +00:00
// check whether the compaction plan is already submitted considering
// datacoord may crash between call sessions.Compaction and updateTaskState to executing
2024-07-03 02:32:08 +00:00
// result, err := t.sessions.GetCompactionPlanResult(t.GetNodeID(), t.GetPlanID())
// if err != nil {
// if errors.Is(err, merr.ErrNodeNotFound) {
// log.Warn("GetCompactionPlanResult fail", zap.Error(err))
// // setNodeID(NullNodeID) to trigger reassign node ID
// t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
// return nil
// }
// return merr.WrapErrGetCompactionPlanResultFail(err)
// }
// if result != nil {
// log.Info("compaction already submitted")
// t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing))
// return nil
// }
var err error
2024-06-10 13:34:08 +00:00
t . plan , err = t . BuildCompactionRequest ( )
if err != nil {
2024-06-30 12:22:09 +00:00
log . Warn ( "Failed to BuildCompactionRequest" , zap . Error ( err ) )
2024-07-30 12:37:57 +00:00
return err
2024-06-10 13:34:08 +00:00
}
err = t . sessions . Compaction ( context . Background ( ) , t . GetNodeID ( ) , t . GetPlan ( ) )
if err != nil {
2024-07-18 02:27:41 +00:00
if errors . Is ( err , merr . ErrDataNodeSlotExhausted ) {
log . Warn ( "fail to notify compaction tasks to DataNode because the node slots exhausted" )
2024-07-30 12:37:57 +00:00
return t . updateAndSaveTaskMeta ( setNodeID ( NullNodeID ) )
2024-07-18 02:27:41 +00:00
}
2024-06-10 13:34:08 +00:00
log . Warn ( "Failed to notify compaction tasks to DataNode" , zap . Error ( err ) )
2024-07-30 12:37:57 +00:00
return t . updateAndSaveTaskMeta ( setState ( datapb . CompactionTaskState_pipelining ) , setNodeID ( NullNodeID ) )
2024-06-10 13:34:08 +00:00
}
2024-07-30 12:37:57 +00:00
return t . updateAndSaveTaskMeta ( setState ( datapb . CompactionTaskState_executing ) )
2024-06-10 13:34:08 +00:00
}
func ( t * clusteringCompactionTask ) ShadowClone ( opts ... compactionTaskOpt ) * datapb . CompactionTask {
taskClone := & datapb . CompactionTask {
PlanID : t . GetPlanID ( ) ,
TriggerID : t . GetTriggerID ( ) ,
State : t . GetState ( ) ,
StartTime : t . GetStartTime ( ) ,
EndTime : t . GetEndTime ( ) ,
TimeoutInSeconds : t . GetTimeoutInSeconds ( ) ,
Type : t . GetType ( ) ,
CollectionTtl : t . CollectionTtl ,
CollectionID : t . GetCollectionID ( ) ,
PartitionID : t . GetPartitionID ( ) ,
Channel : t . GetChannel ( ) ,
InputSegments : t . GetInputSegments ( ) ,
ResultSegments : t . GetResultSegments ( ) ,
TotalRows : t . TotalRows ,
Schema : t . Schema ,
NodeID : t . GetNodeID ( ) ,
FailReason : t . GetFailReason ( ) ,
RetryTimes : t . GetRetryTimes ( ) ,
Pos : t . GetPos ( ) ,
ClusteringKeyField : t . GetClusteringKeyField ( ) ,
MaxSegmentRows : t . GetMaxSegmentRows ( ) ,
PreferSegmentRows : t . GetPreferSegmentRows ( ) ,
AnalyzeTaskID : t . GetAnalyzeTaskID ( ) ,
AnalyzeVersion : t . GetAnalyzeVersion ( ) ,
2024-07-08 02:02:13 +00:00
LastStateStartTime : t . GetLastStateStartTime ( ) ,
2024-06-10 13:34:08 +00:00
}
for _ , opt := range opts {
opt ( taskClone )
}
return taskClone
}
func ( t * clusteringCompactionTask ) updateAndSaveTaskMeta ( opts ... compactionTaskOpt ) error {
task := t . ShadowClone ( opts ... )
err := t . saveTaskMeta ( task )
if err != nil {
2024-06-30 12:22:09 +00:00
log . Warn ( "Failed to saveTaskMeta" , zap . Error ( err ) )
return merr . WrapErrClusteringCompactionMetaError ( "updateAndSaveTaskMeta" , err ) // retryable
2024-06-10 13:34:08 +00:00
}
t . CompactionTask = task
return nil
}
func ( t * clusteringCompactionTask ) checkTimeout ( ) bool {
if t . GetTimeoutInSeconds ( ) > 0 {
diff := time . Since ( time . Unix ( t . GetStartTime ( ) , 0 ) ) . Seconds ( )
if diff > float64 ( t . GetTimeoutInSeconds ( ) ) {
log . Warn ( "compaction timeout" ,
zap . Int32 ( "timeout in seconds" , t . GetTimeoutInSeconds ( ) ) ,
zap . Int64 ( "startTime" , t . GetStartTime ( ) ) ,
)
return true
}
}
return false
}
func ( t * clusteringCompactionTask ) saveTaskMeta ( task * datapb . CompactionTask ) error {
return t . meta . SaveCompactionTask ( task )
}
func ( t * clusteringCompactionTask ) SaveTaskMeta ( ) error {
return t . saveTaskMeta ( t . CompactionTask )
}
func ( t * clusteringCompactionTask ) GetPlan ( ) * datapb . CompactionPlan {
return t . plan
}
func ( t * clusteringCompactionTask ) GetResult ( ) * datapb . CompactionPlanResult {
return t . result
}
func ( t * clusteringCompactionTask ) GetSpan ( ) trace . Span {
return t . span
}
func ( t * clusteringCompactionTask ) EndSpan ( ) {
if t . span != nil {
t . span . End ( )
}
}
func ( t * clusteringCompactionTask ) SetResult ( result * datapb . CompactionPlanResult ) {
t . result = result
}
func ( t * clusteringCompactionTask ) SetSpan ( span trace . Span ) {
t . span = span
}
func ( t * clusteringCompactionTask ) SetPlan ( plan * datapb . CompactionPlan ) {
t . plan = plan
}
func ( t * clusteringCompactionTask ) SetTask ( ct * datapb . CompactionTask ) {
t . CompactionTask = ct
}
func ( t * clusteringCompactionTask ) SetNodeID ( id UniqueID ) error {
return t . updateAndSaveTaskMeta ( setNodeID ( id ) )
}
func ( t * clusteringCompactionTask ) GetLabel ( ) string {
return fmt . Sprintf ( "%d-%s" , t . PartitionID , t . GetChannel ( ) )
}
func ( t * clusteringCompactionTask ) NeedReAssignNodeID ( ) bool {
2024-07-15 09:01:37 +00:00
return t . GetState ( ) == datapb . CompactionTaskState_pipelining && ( t . GetNodeID ( ) == 0 || t . GetNodeID ( ) == NullNodeID )
2024-06-10 13:34:08 +00:00
}
2024-09-26 12:19:14 +00:00
func ( t * clusteringCompactionTask ) GetSlotUsage ( ) int64 {
return t . slotUsage
}
2024-06-10 13:34:08 +00:00
func ( t * clusteringCompactionTask ) CleanLogPath ( ) {
if t . plan . GetSegmentBinlogs ( ) != nil {
for _ , binlogs := range t . plan . GetSegmentBinlogs ( ) {
binlogs . FieldBinlogs = nil
binlogs . Field2StatslogPaths = nil
binlogs . Deltalogs = nil
}
}
if t . result . GetSegments ( ) != nil {
for _ , segment := range t . result . GetSegments ( ) {
segment . InsertLogs = nil
segment . Deltalogs = nil
segment . Field2StatslogPaths = nil
}
}
}