mirror of https://github.com/milvus-io/milvus.git
fix: Avoid datarace in clustering compaction (#34288)
#34289 Signed-off-by: wayblink <anyang.wang@zilliz.com>pull/34339/head
parent
8c42f1341d
commit
da56880d0f
|
@ -100,3 +100,15 @@ func setStartTime(startTime int64) compactionTaskOpt {
|
|||
task.StartTime = startTime
|
||||
}
|
||||
}
|
||||
|
||||
func setRetryTimes(retryTimes int32) compactionTaskOpt {
|
||||
return func(task *datapb.CompactionTask) {
|
||||
task.RetryTimes = retryTimes
|
||||
}
|
||||
}
|
||||
|
||||
func setLastStateStartTime(lastStateStartTime int64) compactionTaskOpt {
|
||||
return func(task *datapb.CompactionTask) {
|
||||
task.LastStateStartTime = lastStateStartTime
|
||||
}
|
||||
}
|
||||
|
|
|
@ -47,10 +47,9 @@ const (
|
|||
|
||||
type clusteringCompactionTask struct {
|
||||
*datapb.CompactionTask
|
||||
plan *datapb.CompactionPlan
|
||||
result *datapb.CompactionPlanResult
|
||||
span trace.Span
|
||||
lastUpdateStateTime int64
|
||||
plan *datapb.CompactionPlan
|
||||
result *datapb.CompactionPlanResult
|
||||
span trace.Span
|
||||
|
||||
meta CompactionMeta
|
||||
sessions SessionManager
|
||||
|
@ -66,24 +65,22 @@ func (t *clusteringCompactionTask) Process() bool {
|
|||
log.Warn("fail in process task", zap.Error(err))
|
||||
if merr.IsRetryableErr(err) && t.RetryTimes < taskMaxRetryTimes {
|
||||
// retry in next Process
|
||||
t.RetryTimes = t.RetryTimes + 1
|
||||
t.updateAndSaveTaskMeta(setRetryTimes(t.RetryTimes + 1))
|
||||
} else {
|
||||
log.Error("task fail with unretryable reason or meet max retry times", zap.Error(err))
|
||||
t.State = datapb.CompactionTaskState_failed
|
||||
t.FailReason = err.Error()
|
||||
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed), setFailReason(err.Error()))
|
||||
}
|
||||
}
|
||||
// task state update, refresh retry times count
|
||||
currentState := t.State.String()
|
||||
if currentState != lastState {
|
||||
t.RetryTimes = 0
|
||||
ts := time.Now().UnixMilli()
|
||||
lastStateDuration := ts - t.lastUpdateStateTime
|
||||
t.updateAndSaveTaskMeta(setRetryTimes(0), setLastStateStartTime(ts))
|
||||
lastStateDuration := ts - t.GetLastStateStartTime()
|
||||
log.Info("clustering compaction task state changed", zap.String("lastState", lastState), zap.String("currentState", currentState), zap.Int64("elapse", lastStateDuration))
|
||||
metrics.DataCoordCompactionLatency.
|
||||
WithLabelValues(fmt.Sprint(typeutil.IsVectorType(t.GetClusteringKeyField().DataType)), datapb.CompactionType_ClusteringCompaction.String(), lastState).
|
||||
Observe(float64(lastStateDuration))
|
||||
t.lastUpdateStateTime = ts
|
||||
|
||||
if t.State == datapb.CompactionTaskState_completed {
|
||||
t.updateAndSaveTaskMeta(setEndTime(ts))
|
||||
|
|
|
@ -913,6 +913,7 @@ message CompactionTask{
|
|||
int64 prefer_segment_rows = 22;
|
||||
int64 analyzeTaskID = 23;
|
||||
int64 analyzeVersion = 24;
|
||||
int64 lastStateStartTime = 25;
|
||||
}
|
||||
|
||||
message PartitionStatsInfo {
|
||||
|
|
Loading…
Reference in New Issue