enhance: unify time in clustering compaction task to unix (#35167)

#34495

Signed-off-by: wayblink <anyang.wang@zilliz.com>
pull/34947/head
wayblink 2024-08-02 10:30:19 +08:00 committed by GitHub
parent 27b6d58981
commit 95462668ca
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 15 additions and 26 deletions

View File

@ -402,7 +402,7 @@ func (c *compactionPlanHandler) cleanCompactionTaskMeta() {
for _, tasks := range triggers {
for _, task := range tasks {
if task.State == datapb.CompactionTaskState_completed || task.State == datapb.CompactionTaskState_cleaned {
duration := time.Since(time.UnixMilli(task.StartTime)).Seconds()
duration := time.Since(time.Unix(task.StartTime, 0)).Seconds()
if duration > float64(Params.DataCoordCfg.CompactionDropToleranceInSeconds.GetAsDuration(time.Second).Seconds()) {
// try best to delete meta
err := c.meta.DropCompactionTask(task)

View File

@ -87,21 +87,21 @@ func (t *clusteringCompactionTask) Process() bool {
// task state update, refresh retry times count
currentState := t.State.String()
if currentState != lastState {
ts := time.Now().UnixMilli()
ts := time.Now().Unix()
lastStateDuration := ts - t.GetLastStateStartTime()
log.Info("clustering compaction task state changed", zap.String("lastState", lastState), zap.String("currentState", currentState), zap.Int64("elapse", lastStateDuration))
log.Info("clustering compaction task state changed", zap.String("lastState", lastState), zap.String("currentState", currentState), zap.Int64("elapse seconds", lastStateDuration))
metrics.DataCoordCompactionLatency.
WithLabelValues(fmt.Sprint(typeutil.IsVectorType(t.GetClusteringKeyField().DataType)), fmt.Sprint(t.CollectionID), t.Channel, datapb.CompactionType_ClusteringCompaction.String(), lastState).
Observe(float64(lastStateDuration))
Observe(float64(lastStateDuration * 1000))
updateOps := []compactionTaskOpt{setRetryTimes(0), setLastStateStartTime(ts)}
if t.State == datapb.CompactionTaskState_completed {
if t.State == datapb.CompactionTaskState_completed || t.State == datapb.CompactionTaskState_cleaned {
updateOps = append(updateOps, setEndTime(ts))
elapse := ts - t.StartTime
log.Info("clustering compaction task total elapse", zap.Int64("elapse", elapse))
log.Info("clustering compaction task total elapse", zap.Int64("elapse seconds", elapse))
metrics.DataCoordCompactionLatency.
WithLabelValues(fmt.Sprint(typeutil.IsVectorType(t.GetClusteringKeyField().DataType)), fmt.Sprint(t.CollectionID), t.Channel, datapb.CompactionType_ClusteringCompaction.String(), "total").
Observe(float64(elapse))
Observe(float64(elapse * 1000))
}
err = t.updateAndSaveTaskMeta(updateOps...)
if err != nil {
@ -561,10 +561,6 @@ func (t *clusteringCompactionTask) EndSpan() {
}
}
func (t *clusteringCompactionTask) SetStartTime(startTime int64) {
t.StartTime = startTime
}
func (t *clusteringCompactionTask) SetResult(result *datapb.CompactionPlanResult) {
t.result = result
}

View File

@ -201,10 +201,6 @@ func (t *l0CompactionTask) GetPlan() *datapb.CompactionPlan {
return t.plan
}
func (t *l0CompactionTask) SetStartTime(startTime int64) {
t.StartTime = startTime
}
func (t *l0CompactionTask) NeedReAssignNodeID() bool {
return t.GetState() == datapb.CompactionTaskState_pipelining && (t.GetNodeID() == 0 || t.GetNodeID() == NullNodeID)
}

View File

@ -769,19 +769,19 @@ func (s *CompactionPlanHandlerSuite) TestCompactionGC() {
PlanID: 1,
Type: datapb.CompactionType_MixCompaction,
State: datapb.CompactionTaskState_completed,
StartTime: time.Now().UnixMilli() - (time.Second * 100000).Milliseconds(),
StartTime: time.Now().Add(-time.Second * 100000).Unix(),
},
{
PlanID: 2,
Type: datapb.CompactionType_MixCompaction,
State: datapb.CompactionTaskState_cleaned,
StartTime: time.Now().UnixMilli() - (time.Second * 100000).Milliseconds(),
StartTime: time.Now().Add(-time.Second * 100000).Unix(),
},
{
PlanID: 3,
Type: datapb.CompactionType_MixCompaction,
State: datapb.CompactionTaskState_cleaned,
StartTime: time.Now().UnixMilli(),
StartTime: time.Now().Unix(),
},
}

View File

@ -269,7 +269,7 @@ func (m *CompactionTriggerManager) SubmitL0ViewToScheduler(ctx context.Context,
TriggerID: taskID, // inner trigger, use task id as trigger id
PlanID: taskID,
Type: datapb.CompactionType_Level0DeleteCompaction,
StartTime: time.Now().UnixMilli(),
StartTime: time.Now().Unix(),
InputSegments: levelZeroSegs,
State: datapb.CompactionTaskState_pipelining,
Channel: view.GetGroupLabel().Channel,
@ -329,7 +329,7 @@ func (m *CompactionTriggerManager) SubmitClusteringViewToScheduler(ctx context.C
PlanID: taskID,
TriggerID: view.(*ClusteringSegmentsView).triggerID,
State: datapb.CompactionTaskState_pipelining,
StartTime: time.Now().UnixMilli(),
StartTime: time.Now().Unix(),
CollectionTtl: view.(*ClusteringSegmentsView).collectionTTL.Nanoseconds(),
TimeoutInSeconds: Params.DataCoordCfg.ClusteringCompactionTimeoutInSeconds.GetAsInt32(),
Type: datapb.CompactionType_ClusteringCompaction,
@ -344,7 +344,7 @@ func (m *CompactionTriggerManager) SubmitClusteringViewToScheduler(ctx context.C
PreferSegmentRows: preferSegmentRows,
TotalRows: totalRows,
AnalyzeTaskID: taskID + 1,
LastStateStartTime: time.Now().UnixMilli(),
LastStateStartTime: time.Now().Unix(),
}
err = m.compactionHandler.enqueueCompaction(task)
if err != nil {
@ -383,7 +383,7 @@ func (m *CompactionTriggerManager) SubmitSingleViewToScheduler(ctx context.Conte
PlanID: taskID,
TriggerID: view.(*MixSegmentView).triggerID,
State: datapb.CompactionTaskState_pipelining,
StartTime: time.Now().UnixMilli(),
StartTime: time.Now().Unix(),
CollectionTtl: view.(*MixSegmentView).collectionTTL.Nanoseconds(),
TimeoutInSeconds: Params.DataCoordCfg.ClusteringCompactionTimeoutInSeconds.GetAsInt32(),
Type: datapb.CompactionType_MixCompaction, // todo: use SingleCompaction
@ -394,7 +394,7 @@ func (m *CompactionTriggerManager) SubmitSingleViewToScheduler(ctx context.Conte
InputSegments: lo.Map(view.GetSegmentsView(), func(segmentView *SegmentView, _ int) int64 { return segmentView.ID }),
ResultSegments: []int64{taskID + 1},
TotalRows: totalRows,
LastStateStartTime: time.Now().UnixMilli(),
LastStateStartTime: time.Now().Unix(),
}
err = m.compactionHandler.enqueueCompaction(task)
if err != nil {

View File

@ -26,7 +26,6 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"go.uber.org/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
@ -35,7 +34,6 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
@ -72,7 +70,6 @@ func (s *ClusteringCompactionTaskSuite) SetupTest() {
s.mockAlloc.EXPECT().Alloc(mock.Anything).RunAndReturn(func(x uint32) (int64, int64, error) {
start := s.mockID.Load()
end := s.mockID.Add(int64(x))
log.Info("wayblink", zap.Int64("start", start), zap.Int64("end", end))
return start, end, nil
}).Maybe()
s.mockAlloc.EXPECT().AllocOne().RunAndReturn(func() (int64, error) {