mirror of https://github.com/milvus-io/milvus.git
fix: [cp24]Change memoryCheck write lock to read lock (#37526)
pr: #37525 Signed-off-by: yangxuan <xuan.yang@zilliz.com>pull/37713/head
parent
c50cb8d3ef
commit
5d5f899274
|
@ -170,7 +170,8 @@ func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.F
|
|||
zap.Int64s("sealSegments", sealedSegmentIDs),
|
||||
zap.Int("flushedSegmentsCount", len(flushSegmentIDs)),
|
||||
zap.Time("timeOfSeal", timeOfSeal),
|
||||
zap.Time("flushTs", tsoutil.PhysicalTime(ts)))
|
||||
zap.Uint64("flushTs", ts),
|
||||
zap.Time("flushTs in time", tsoutil.PhysicalTime(ts)))
|
||||
|
||||
return &datapb.FlushResponse{
|
||||
Status: merr.Success(),
|
||||
|
@ -1227,7 +1228,8 @@ func (s *Server) WatchChannels(ctx context.Context, req *datapb.WatchChannelsReq
|
|||
// GetFlushState gets the flush state of the collection based on the provided flush ts and segment IDs.
|
||||
func (s *Server) GetFlushState(ctx context.Context, req *datapb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error) {
|
||||
log := log.Ctx(ctx).With(zap.Int64("collection", req.GetCollectionID()),
|
||||
zap.Time("flushTs", tsoutil.PhysicalTime(req.GetFlushTs()))).
|
||||
zap.Uint64("flushTs", req.GetFlushTs()),
|
||||
zap.Time("flushTs in time", tsoutil.PhysicalTime(req.GetFlushTs()))).
|
||||
WithRateGroup("dc.GetFlushState", 1, 60)
|
||||
if err := merr.CheckHealthy(s.GetStateCode()); err != nil {
|
||||
return &milvuspb.GetFlushStateResponse{
|
||||
|
|
|
@ -184,7 +184,7 @@ func (t *SyncTask) Run() (err error) {
|
|||
log.Info("segment removed", zap.Int64("segmentID", t.segment.SegmentID()), zap.String("channel", t.channelName))
|
||||
}
|
||||
|
||||
log.Info("task done", zap.Float64("flushedSize", totalSize))
|
||||
log.Info("task done", zap.Float64("flushedSize", totalSize), zap.Duration("timeTaken", t.tr.ElapseSpan()))
|
||||
|
||||
if !t.isFlush {
|
||||
metrics.DataNodeAutoFlushBufferCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SuccessLabel, t.level.String()).Inc()
|
||||
|
|
|
@ -92,9 +92,16 @@ func (m *bufferManager) memoryCheck() {
|
|||
if !paramtable.Get().DataNodeCfg.MemoryForceSyncEnable.GetAsBool() {
|
||||
return
|
||||
}
|
||||
startTime := time.Now()
|
||||
m.mut.RLock()
|
||||
defer func() {
|
||||
dur := time.Since(startTime)
|
||||
if dur > 30*time.Second {
|
||||
log.Warn("memory check takes too long", zap.Duration("time", dur))
|
||||
}
|
||||
m.mut.RUnlock()
|
||||
}()
|
||||
|
||||
m.mut.Lock()
|
||||
defer m.mut.Unlock()
|
||||
for {
|
||||
var total int64
|
||||
var candidate WriteBuffer
|
||||
|
|
Loading…
Reference in New Issue