diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 086d83ce94..3b263a9383 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -175,7 +175,8 @@ func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.F zap.Int64s("sealSegments", sealedSegmentIDs), zap.Int("flushedSegmentsCount", len(flushSegmentIDs)), zap.Time("timeOfSeal", timeOfSeal), - zap.Time("flushTs", tsoutil.PhysicalTime(ts))) + zap.Uint64("flushTs", ts), + zap.Time("flushTs in time", tsoutil.PhysicalTime(ts))) return &datapb.FlushResponse{ Status: merr.Success(), @@ -1273,7 +1274,8 @@ func (s *Server) WatchChannels(ctx context.Context, req *datapb.WatchChannelsReq // GetFlushState gets the flush state of the collection based on the provided flush ts and segment IDs. func (s *Server) GetFlushState(ctx context.Context, req *datapb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error) { log := log.Ctx(ctx).With(zap.Int64("collection", req.GetCollectionID()), - zap.Time("flushTs", tsoutil.PhysicalTime(req.GetFlushTs()))). + zap.Uint64("flushTs", req.GetFlushTs()), + zap.Time("flushTs in time", tsoutil.PhysicalTime(req.GetFlushTs()))). WithRateGroup("dc.GetFlushState", 1, 60) if err := merr.CheckHealthy(s.GetStateCode()); err != nil { return &milvuspb.GetFlushStateResponse{ diff --git a/internal/flushcommon/syncmgr/task.go b/internal/flushcommon/syncmgr/task.go index 1f0e4f8c30..b7463b04b9 100644 --- a/internal/flushcommon/syncmgr/task.go +++ b/internal/flushcommon/syncmgr/task.go @@ -202,7 +202,7 @@ func (t *SyncTask) Run(ctx context.Context) (err error) { log.Info("segment removed", zap.Int64("segmentID", t.segment.SegmentID()), zap.String("channel", t.channelName)) } - t.execTime = t.tr.RecordSpan() + t.execTime = t.tr.ElapseSpan() log.Info("task done", zap.Int64("flushedSize", totalSize), zap.Duration("timeTaken", t.execTime)) if !t.isFlush { diff --git a/internal/flushcommon/writebuffer/manager.go b/internal/flushcommon/writebuffer/manager.go index 38ec89f2d8..a1311814c9 100644 --- a/internal/flushcommon/writebuffer/manager.go +++ b/internal/flushcommon/writebuffer/manager.go @@ -96,9 +96,16 @@ func (m *bufferManager) memoryCheck() { if !paramtable.Get().DataNodeCfg.MemoryForceSyncEnable.GetAsBool() { return } + startTime := time.Now() + m.mut.RLock() + defer func() { + dur := time.Since(startTime) + if dur > 30*time.Second { + log.Warn("memory check takes too long", zap.Duration("time", dur)) + } + m.mut.RUnlock() + }() - m.mut.Lock() - defer m.mut.Unlock() for { var total int64 var candidate WriteBuffer