diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index 98a09a65f6..8e15fa45af 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -402,7 +402,7 @@ func (c *compactionPlanHandler) cleanCompactionTaskMeta() { for _, tasks := range triggers { for _, task := range tasks { if task.State == datapb.CompactionTaskState_completed || task.State == datapb.CompactionTaskState_cleaned { - duration := time.Since(time.UnixMilli(task.StartTime)).Seconds() + duration := time.Since(time.Unix(task.StartTime, 0)).Seconds() if duration > float64(Params.DataCoordCfg.CompactionDropToleranceInSeconds.GetAsDuration(time.Second).Seconds()) { // try best to delete meta err := c.meta.DropCompactionTask(task) diff --git a/internal/datacoord/compaction_task_clustering.go b/internal/datacoord/compaction_task_clustering.go index e0b40e0b15..693689d2df 100644 --- a/internal/datacoord/compaction_task_clustering.go +++ b/internal/datacoord/compaction_task_clustering.go @@ -87,21 +87,21 @@ func (t *clusteringCompactionTask) Process() bool { // task state update, refresh retry times count currentState := t.State.String() if currentState != lastState { - ts := time.Now().UnixMilli() + ts := time.Now().Unix() lastStateDuration := ts - t.GetLastStateStartTime() - log.Info("clustering compaction task state changed", zap.String("lastState", lastState), zap.String("currentState", currentState), zap.Int64("elapse", lastStateDuration)) + log.Info("clustering compaction task state changed", zap.String("lastState", lastState), zap.String("currentState", currentState), zap.Int64("elapse seconds", lastStateDuration)) metrics.DataCoordCompactionLatency. WithLabelValues(fmt.Sprint(typeutil.IsVectorType(t.GetClusteringKeyField().DataType)), fmt.Sprint(t.CollectionID), t.Channel, datapb.CompactionType_ClusteringCompaction.String(), lastState). - Observe(float64(lastStateDuration)) + Observe(float64(lastStateDuration * 1000)) updateOps := []compactionTaskOpt{setRetryTimes(0), setLastStateStartTime(ts)} - if t.State == datapb.CompactionTaskState_completed { + if t.State == datapb.CompactionTaskState_completed || t.State == datapb.CompactionTaskState_cleaned { updateOps = append(updateOps, setEndTime(ts)) elapse := ts - t.StartTime - log.Info("clustering compaction task total elapse", zap.Int64("elapse", elapse)) + log.Info("clustering compaction task total elapse", zap.Int64("elapse seconds", elapse)) metrics.DataCoordCompactionLatency. WithLabelValues(fmt.Sprint(typeutil.IsVectorType(t.GetClusteringKeyField().DataType)), fmt.Sprint(t.CollectionID), t.Channel, datapb.CompactionType_ClusteringCompaction.String(), "total"). - Observe(float64(elapse)) + Observe(float64(elapse * 1000)) } err = t.updateAndSaveTaskMeta(updateOps...) if err != nil { @@ -561,10 +561,6 @@ func (t *clusteringCompactionTask) EndSpan() { } } -func (t *clusteringCompactionTask) SetStartTime(startTime int64) { - t.StartTime = startTime -} - func (t *clusteringCompactionTask) SetResult(result *datapb.CompactionPlanResult) { t.result = result } diff --git a/internal/datacoord/compaction_task_l0.go b/internal/datacoord/compaction_task_l0.go index b5b79d4b21..cf531eeb87 100644 --- a/internal/datacoord/compaction_task_l0.go +++ b/internal/datacoord/compaction_task_l0.go @@ -201,10 +201,6 @@ func (t *l0CompactionTask) GetPlan() *datapb.CompactionPlan { return t.plan } -func (t *l0CompactionTask) SetStartTime(startTime int64) { - t.StartTime = startTime -} - func (t *l0CompactionTask) NeedReAssignNodeID() bool { return t.GetState() == datapb.CompactionTaskState_pipelining && (t.GetNodeID() == 0 || t.GetNodeID() == NullNodeID) } diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index 00829953f7..f9f534de43 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -769,19 +769,19 @@ func (s *CompactionPlanHandlerSuite) TestCompactionGC() { PlanID: 1, Type: datapb.CompactionType_MixCompaction, State: datapb.CompactionTaskState_completed, - StartTime: time.Now().UnixMilli() - (time.Second * 100000).Milliseconds(), + StartTime: time.Now().Add(-time.Second * 100000).Unix(), }, { PlanID: 2, Type: datapb.CompactionType_MixCompaction, State: datapb.CompactionTaskState_cleaned, - StartTime: time.Now().UnixMilli() - (time.Second * 100000).Milliseconds(), + StartTime: time.Now().Add(-time.Second * 100000).Unix(), }, { PlanID: 3, Type: datapb.CompactionType_MixCompaction, State: datapb.CompactionTaskState_cleaned, - StartTime: time.Now().UnixMilli(), + StartTime: time.Now().Unix(), }, } diff --git a/internal/datacoord/compaction_trigger_v2.go b/internal/datacoord/compaction_trigger_v2.go index bb1004e183..1ed09a4daa 100644 --- a/internal/datacoord/compaction_trigger_v2.go +++ b/internal/datacoord/compaction_trigger_v2.go @@ -269,7 +269,7 @@ func (m *CompactionTriggerManager) SubmitL0ViewToScheduler(ctx context.Context, TriggerID: taskID, // inner trigger, use task id as trigger id PlanID: taskID, Type: datapb.CompactionType_Level0DeleteCompaction, - StartTime: time.Now().UnixMilli(), + StartTime: time.Now().Unix(), InputSegments: levelZeroSegs, State: datapb.CompactionTaskState_pipelining, Channel: view.GetGroupLabel().Channel, @@ -329,7 +329,7 @@ func (m *CompactionTriggerManager) SubmitClusteringViewToScheduler(ctx context.C PlanID: taskID, TriggerID: view.(*ClusteringSegmentsView).triggerID, State: datapb.CompactionTaskState_pipelining, - StartTime: time.Now().UnixMilli(), + StartTime: time.Now().Unix(), CollectionTtl: view.(*ClusteringSegmentsView).collectionTTL.Nanoseconds(), TimeoutInSeconds: Params.DataCoordCfg.ClusteringCompactionTimeoutInSeconds.GetAsInt32(), Type: datapb.CompactionType_ClusteringCompaction, @@ -344,7 +344,7 @@ func (m *CompactionTriggerManager) SubmitClusteringViewToScheduler(ctx context.C PreferSegmentRows: preferSegmentRows, TotalRows: totalRows, AnalyzeTaskID: taskID + 1, - LastStateStartTime: time.Now().UnixMilli(), + LastStateStartTime: time.Now().Unix(), } err = m.compactionHandler.enqueueCompaction(task) if err != nil { @@ -383,7 +383,7 @@ func (m *CompactionTriggerManager) SubmitSingleViewToScheduler(ctx context.Conte PlanID: taskID, TriggerID: view.(*MixSegmentView).triggerID, State: datapb.CompactionTaskState_pipelining, - StartTime: time.Now().UnixMilli(), + StartTime: time.Now().Unix(), CollectionTtl: view.(*MixSegmentView).collectionTTL.Nanoseconds(), TimeoutInSeconds: Params.DataCoordCfg.ClusteringCompactionTimeoutInSeconds.GetAsInt32(), Type: datapb.CompactionType_MixCompaction, // todo: use SingleCompaction @@ -394,7 +394,7 @@ func (m *CompactionTriggerManager) SubmitSingleViewToScheduler(ctx context.Conte InputSegments: lo.Map(view.GetSegmentsView(), func(segmentView *SegmentView, _ int) int64 { return segmentView.ID }), ResultSegments: []int64{taskID + 1}, TotalRows: totalRows, - LastStateStartTime: time.Now().UnixMilli(), + LastStateStartTime: time.Now().Unix(), } err = m.compactionHandler.enqueueCompaction(task) if err != nil { diff --git a/internal/datanode/compaction/clustering_compactor_test.go b/internal/datanode/compaction/clustering_compactor_test.go index 98ee467340..1b5aba34d1 100644 --- a/internal/datanode/compaction/clustering_compactor_test.go +++ b/internal/datanode/compaction/clustering_compactor_test.go @@ -26,7 +26,6 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "go.uber.org/atomic" - "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" @@ -35,7 +34,6 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" - "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/tsoutil" @@ -72,7 +70,6 @@ func (s *ClusteringCompactionTaskSuite) SetupTest() { s.mockAlloc.EXPECT().Alloc(mock.Anything).RunAndReturn(func(x uint32) (int64, int64, error) { start := s.mockID.Load() end := s.mockID.Add(int64(x)) - log.Info("wayblink", zap.Int64("start", start), zap.Int64("end", end)) return start, end, nil }).Maybe() s.mockAlloc.EXPECT().AllocOne().RunAndReturn(func() (int64, error) {