mirror of https://github.com/milvus-io/milvus.git
fix: Change memoryCheck write lock to read lock (#37525)
See also: milvus-io#37493 Signed-off-by: yangxuan <xuan.yang@zilliz.com> Signed-off-by: yangxuan <xuan.yang@zilliz.com>pull/37707/head
parent
d1596297d9
commit
5a23c80f20
|
@ -175,7 +175,8 @@ func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.F
|
||||||
zap.Int64s("sealSegments", sealedSegmentIDs),
|
zap.Int64s("sealSegments", sealedSegmentIDs),
|
||||||
zap.Int("flushedSegmentsCount", len(flushSegmentIDs)),
|
zap.Int("flushedSegmentsCount", len(flushSegmentIDs)),
|
||||||
zap.Time("timeOfSeal", timeOfSeal),
|
zap.Time("timeOfSeal", timeOfSeal),
|
||||||
zap.Time("flushTs", tsoutil.PhysicalTime(ts)))
|
zap.Uint64("flushTs", ts),
|
||||||
|
zap.Time("flushTs in time", tsoutil.PhysicalTime(ts)))
|
||||||
|
|
||||||
return &datapb.FlushResponse{
|
return &datapb.FlushResponse{
|
||||||
Status: merr.Success(),
|
Status: merr.Success(),
|
||||||
|
@ -1273,7 +1274,8 @@ func (s *Server) WatchChannels(ctx context.Context, req *datapb.WatchChannelsReq
|
||||||
// GetFlushState gets the flush state of the collection based on the provided flush ts and segment IDs.
|
// GetFlushState gets the flush state of the collection based on the provided flush ts and segment IDs.
|
||||||
func (s *Server) GetFlushState(ctx context.Context, req *datapb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error) {
|
func (s *Server) GetFlushState(ctx context.Context, req *datapb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error) {
|
||||||
log := log.Ctx(ctx).With(zap.Int64("collection", req.GetCollectionID()),
|
log := log.Ctx(ctx).With(zap.Int64("collection", req.GetCollectionID()),
|
||||||
zap.Time("flushTs", tsoutil.PhysicalTime(req.GetFlushTs()))).
|
zap.Uint64("flushTs", req.GetFlushTs()),
|
||||||
|
zap.Time("flushTs in time", tsoutil.PhysicalTime(req.GetFlushTs()))).
|
||||||
WithRateGroup("dc.GetFlushState", 1, 60)
|
WithRateGroup("dc.GetFlushState", 1, 60)
|
||||||
if err := merr.CheckHealthy(s.GetStateCode()); err != nil {
|
if err := merr.CheckHealthy(s.GetStateCode()); err != nil {
|
||||||
return &milvuspb.GetFlushStateResponse{
|
return &milvuspb.GetFlushStateResponse{
|
||||||
|
|
|
@ -202,7 +202,7 @@ func (t *SyncTask) Run(ctx context.Context) (err error) {
|
||||||
log.Info("segment removed", zap.Int64("segmentID", t.segment.SegmentID()), zap.String("channel", t.channelName))
|
log.Info("segment removed", zap.Int64("segmentID", t.segment.SegmentID()), zap.String("channel", t.channelName))
|
||||||
}
|
}
|
||||||
|
|
||||||
t.execTime = t.tr.RecordSpan()
|
t.execTime = t.tr.ElapseSpan()
|
||||||
log.Info("task done", zap.Int64("flushedSize", totalSize), zap.Duration("timeTaken", t.execTime))
|
log.Info("task done", zap.Int64("flushedSize", totalSize), zap.Duration("timeTaken", t.execTime))
|
||||||
|
|
||||||
if !t.isFlush {
|
if !t.isFlush {
|
||||||
|
|
|
@ -96,9 +96,16 @@ func (m *bufferManager) memoryCheck() {
|
||||||
if !paramtable.Get().DataNodeCfg.MemoryForceSyncEnable.GetAsBool() {
|
if !paramtable.Get().DataNodeCfg.MemoryForceSyncEnable.GetAsBool() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
startTime := time.Now()
|
||||||
|
m.mut.RLock()
|
||||||
|
defer func() {
|
||||||
|
dur := time.Since(startTime)
|
||||||
|
if dur > 30*time.Second {
|
||||||
|
log.Warn("memory check takes too long", zap.Duration("time", dur))
|
||||||
|
}
|
||||||
|
m.mut.RUnlock()
|
||||||
|
}()
|
||||||
|
|
||||||
m.mut.Lock()
|
|
||||||
defer m.mut.Unlock()
|
|
||||||
for {
|
for {
|
||||||
var total int64
|
var total int64
|
||||||
var candidate WriteBuffer
|
var candidate WriteBuffer
|
||||||
|
|
Loading…
Reference in New Issue