From 0d4e781f69284b2a09428b070499b5c6f8ec830e Mon Sep 17 00:00:00 2001 From: yah01 Date: Wed, 17 Jan 2024 23:30:55 +0800 Subject: [PATCH] fix: the system rejects all queries and never recovers if enabled read rate limit (#30061) fix #30060 Signed-off-by: yah01 --- internal/querynodev2/collector/counter.go | 17 ++++++----------- internal/querynodev2/collector/counter_test.go | 4 ++-- .../tasks/concurrent_safe_scheduler.go | 4 ++-- 3 files changed, 10 insertions(+), 15 deletions(-) diff --git a/internal/querynodev2/collector/counter.go b/internal/querynodev2/collector/counter.go index 36091e8a19..990d5f9876 100644 --- a/internal/querynodev2/collector/counter.go +++ b/internal/querynodev2/collector/counter.go @@ -25,7 +25,7 @@ type counter struct { values map[string]int64 } -func (c *counter) Inc(label string, value int64) { +func (c *counter) Add(label string, value int64) { c.Lock() defer c.Unlock() @@ -38,17 +38,12 @@ func (c *counter) Inc(label string, value int64) { } } -func (c *counter) Dec(label string, value int64) { - c.Lock() - defer c.Unlock() +func (c *counter) Inc(label string) { + c.Add(label, 1) +} - v, ok := c.values[label] - if !ok { - c.values[label] = -value - } else { - v -= value - c.values[label] = v - } +func (c *counter) Dec(label string) { + c.Add(label, -1) } func (c *counter) Set(label string, value int64) { diff --git a/internal/querynodev2/collector/counter_test.go b/internal/querynodev2/collector/counter_test.go index 731dd6477b..d1c525bec1 100644 --- a/internal/querynodev2/collector/counter_test.go +++ b/internal/querynodev2/collector/counter_test.go @@ -39,7 +39,7 @@ func (suite *CounterTestSuite) TestBasic() { suite.Equal(int64(0), value) // get after inc - suite.counter.Inc(suite.label, 3) + suite.counter.Add(suite.label, 3) value = suite.counter.Get(suite.label) suite.Equal(int64(3), value) @@ -49,7 +49,7 @@ func (suite *CounterTestSuite) TestBasic() { suite.Equal(int64(0), value) // get after dec - suite.counter.Dec(suite.label, 3) + suite.counter.Add(suite.label, -3) value = suite.counter.Get(suite.label) suite.Equal(int64(-3), value) diff --git a/internal/querynodev2/tasks/concurrent_safe_scheduler.go b/internal/querynodev2/tasks/concurrent_safe_scheduler.go index 0a2396e8e3..0454207361 100644 --- a/internal/querynodev2/tasks/concurrent_safe_scheduler.go +++ b/internal/querynodev2/tasks/concurrent_safe_scheduler.go @@ -232,13 +232,13 @@ func (s *scheduler) exec() { s.getPool(t).Submit(func() (any, error) { // Update concurrency metric and notify task done. metrics.QueryNodeReadTaskConcurrency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Inc() - collector.Counter.Inc(metricsinfo.ExecuteQueueType, 1) + collector.Counter.Inc(metricsinfo.ExecuteQueueType) err := t.Execute() // Update all metric after task finished. metrics.QueryNodeReadTaskConcurrency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Dec() - collector.Counter.Dec(metricsinfo.ExecuteQueueType, -1) + collector.Counter.Dec(metricsinfo.ExecuteQueueType) // Notify task done. t.Done(err)