diff --git a/internal/config/refresher.go b/internal/config/refresher.go index fff11a3b48..5b7922c622 100644 --- a/internal/config/refresher.go +++ b/internal/config/refresher.go @@ -54,6 +54,7 @@ func (r refresher) stop() { func (r refresher) refreshPeriodically(name string) { ticker := time.NewTicker(r.refreshInterval) + defer ticker.Stop() log.Info("start refreshing configurations", zap.String("source", name)) for { select { diff --git a/internal/datacoord/channel_checker.go b/internal/datacoord/channel_checker.go index 03f400f8bf..37655ade6c 100644 --- a/internal/datacoord/channel_checker.go +++ b/internal/datacoord/channel_checker.go @@ -92,10 +92,10 @@ func (c *channelStateTimer) startOne(watchState datapb.ChannelWatchState, channe } stop := make(chan struct{}) - ticker := time.NewTimer(timeout) + timer := time.NewTimer(timeout) c.removeTimers([]string{channelName}) c.runningTimerStops.Store(channelName, stop) - c.runningTimers.Store(channelName, ticker) + c.runningTimers.Store(channelName, timer) go func() { log.Info("timer started", @@ -103,10 +103,10 @@ func (c *channelStateTimer) startOne(watchState datapb.ChannelWatchState, channe zap.Int64("nodeID", nodeID), zap.String("channel name", channelName), zap.Duration("check interval", timeout)) - defer ticker.Stop() + defer timer.Stop() select { - case <-ticker.C: + case <-timer.C: // check tickle at path as :tickle/[prefix]/{channel_name} log.Info("timeout and stop timer: wait for channel ACK timeout", zap.String("watch state", watchState.String()), diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index 08ee0c6341..b60b3442c4 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -248,12 +248,13 @@ func (c *ChannelManager) unwatchDroppedChannels() { // NOT USED. func (c *ChannelManager) bgCheckChannelsWork(ctx context.Context) { - timer := time.NewTicker(bgCheckInterval) + ticker := time.NewTicker(bgCheckInterval) + defer ticker.Stop() for { select { case <-ctx.Done(): return - case <-timer.C: + case <-ticker.C: c.mu.Lock() channels := c.store.GetNodesChannels() diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index e8d56e7e38..742d8ae36d 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -122,16 +122,16 @@ func newCompactionPlanHandler(sessions *SessionManager, cm *ChannelManager, meta func (c *compactionPlanHandler) start() { interval := Params.DataCoordCfg.CompactionCheckIntervalInSeconds.GetAsDuration(time.Second) - ticker := time.NewTicker(interval) c.quit = make(chan struct{}) c.wg.Add(1) go func() { defer c.wg.Done() + ticker := time.NewTicker(interval) + defer ticker.Stop() for { select { case <-c.quit: - ticker.Stop() log.Info("compaction handler quit") return case <-ticker.C: diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index 46a3a5e4b1..34850f527a 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -1474,12 +1474,14 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) { // should be split into two plans var plans []*datapb.CompactionPlan + ticker := time.NewTicker(3 * time.Second) + defer ticker.Stop() WAIT: for { select { case val := <-spy.spyChan: plans = append(plans, val) - case <-time.After(3 * time.Second): + case <-ticker.C: break WAIT } } diff --git a/internal/datacoord/garbage_collector.go b/internal/datacoord/garbage_collector.go index f88072bc77..1ac972fc77 100644 --- a/internal/datacoord/garbage_collector.go +++ b/internal/datacoord/garbage_collector.go @@ -94,10 +94,11 @@ func (gc *garbageCollector) start() { // work contains actual looping check logic func (gc *garbageCollector) work() { defer gc.wg.Done() - ticker := time.Tick(gc.option.checkInterval) + ticker := time.NewTicker(gc.option.checkInterval) + defer ticker.Stop() for { select { - case <-ticker: + case <-ticker.C: gc.clearEtcd() gc.recycleUnusedIndexes() gc.recycleUnusedSegIndexes() diff --git a/internal/datanode/binlog_io.go b/internal/datanode/binlog_io.go index 65f9adc721..068244aaa6 100644 --- a/internal/datanode/binlog_io.go +++ b/internal/datanode/binlog_io.go @@ -75,7 +75,6 @@ func (b *binlogIO) download(ctx context.Context, paths []string) ([]*Blob, error g.Go(func() error { for err != nil { select { - case <-gCtx.Done(): log.Warn("ctx done when downloading kvs from blob storage") return errDownloadFromBlobStorage @@ -83,7 +82,7 @@ func (b *binlogIO) download(ctx context.Context, paths []string) ([]*Blob, error default: if err != errStart { log.Warn("downloading failed, retry in 50ms", zap.Strings("paths", paths)) - <-time.After(50 * time.Millisecond) + time.Sleep(50 * time.Millisecond) } vs, err = b.MultiRead(ctx, paths) } @@ -122,7 +121,7 @@ func (b *binlogIO) uploadSegmentFiles( log.Warn("save binlog failed, retry in 50ms", zap.Int64("collectionID", CollectionID), zap.Int64("segmentID", segID)) - <-time.After(50 * time.Millisecond) + time.Sleep(50 * time.Millisecond) } err = b.MultiWrite(ctx, kvs) } diff --git a/internal/datanode/event_manager.go b/internal/datanode/event_manager.go index 946473bb9a..7d9dce1b8f 100644 --- a/internal/datanode/event_manager.go +++ b/internal/datanode/event_manager.go @@ -122,8 +122,9 @@ func (t *tickler) watch() { } t.closeWg.Add(1) - ticker := time.NewTicker(t.interval) go func() { + ticker := time.NewTicker(t.interval) + defer ticker.Stop() for { select { case <-ticker.C: diff --git a/internal/datanode/event_manager_test.go b/internal/datanode/event_manager_test.go index 46a3445549..88aa506418 100644 --- a/internal/datanode/event_manager_test.go +++ b/internal/datanode/event_manager_test.go @@ -68,7 +68,7 @@ func TestChannelEventManager(t *testing.T) { select { case <-ch: - case <-time.NewTimer(time.Second).C: + case <-time.After(time.Second): t.FailNow() } close(em.eventChan) diff --git a/internal/datanode/flow_graph_delete_node_test.go b/internal/datanode/flow_graph_delete_node_test.go index 1dcc5b1096..be439d99d8 100644 --- a/internal/datanode/flow_graph_delete_node_test.go +++ b/internal/datanode/flow_graph_delete_node_test.go @@ -344,9 +344,8 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { fm.startDropping() delNode.Operate([]flowgraph.Msg{&msg}) }) - timer := time.NewTimer(time.Millisecond) select { - case <-timer.C: + case <-time.After(time.Millisecond): t.FailNow() case <-sig: } diff --git a/internal/datanode/flow_graph_time_ticker_test.go b/internal/datanode/flow_graph_time_ticker_test.go index 4c1b9d27e5..517a052017 100644 --- a/internal/datanode/flow_graph_time_ticker_test.go +++ b/internal/datanode/flow_graph_time_ticker_test.go @@ -44,14 +44,13 @@ func TestMergedTimeTicker_close10000(t *testing.T) { mt.close() }(mt) } - tm := time.NewTimer(10 * time.Second) done := make(chan struct{}) go func() { wg.Wait() close(done) }() select { - case <-tm.C: + case <-time.After(10 * time.Second): t.Fatal("wait all timer close, timeout") case <-done: } diff --git a/internal/datanode/flush_manager_test.go b/internal/datanode/flush_manager_test.go index 03e52b937e..b3e4b05b95 100644 --- a/internal/datanode/flush_manager_test.go +++ b/internal/datanode/flush_manager_test.go @@ -378,9 +378,8 @@ func TestRendezvousFlushManager_waitForAllFlushQueue(t *testing.T) { }) } - timeout := time.NewTimer(time.Second) select { - case <-timeout.C: + case <-time.After(time.Second): t.FailNow() case <-signal: } diff --git a/internal/distributed/proxy/service_test.go b/internal/distributed/proxy/service_test.go index 4cfff88e05..f53e7b48a6 100644 --- a/internal/distributed/proxy/service_test.go +++ b/internal/distributed/proxy/service_test.go @@ -882,7 +882,6 @@ func withCredential(clientPemPath, clientKeyPath, clientCaPath string) (credenti // waitForGrpcReady block until service available or panic after times out. func waitForGrpcReady(opt *WaitOption) { Params := ¶mtable.Get().ProxyGrpcServerCfg - ticker := time.NewTicker(opt.Duration) ch := make(chan error, 1) go func() { @@ -910,6 +909,8 @@ func waitForGrpcReady(opt *WaitOption) { } }() + timer := time.NewTimer(opt.Duration) + select { case err := <-ch: if err != nil { @@ -918,7 +919,7 @@ func waitForGrpcReady(opt *WaitOption) { zap.Any("option", opt)) panic(err) } - case <-ticker.C: + case <-timer.C: log.Error("grpc service not ready", zap.Any("option", opt)) panic("grpc service not ready") diff --git a/internal/indexnode/indexnode_service_test.go b/internal/indexnode/indexnode_service_test.go index 5daee455d2..a0273a54f7 100644 --- a/internal/indexnode/indexnode_service_test.go +++ b/internal/indexnode/indexnode_service_test.go @@ -115,12 +115,13 @@ func TestIndexNodeSimple(t *testing.T) { ClusterID: clusterID, BuildIDs: []int64{buildID}, } - timeout := time.After(time.Second * 10) var idxInfo *indexpb.IndexTaskInfo + timeoutCtx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() Loop: for { select { - case <-timeout: + case <-timeoutCtx.Done(): t.Fatal("timeout for querying jobs") default: time.Sleep(1 * time.Millisecond) @@ -302,11 +303,12 @@ func TestIndexNodeComplex(t *testing.T) { }(i) } testwg.Wait() - timeout := time.After(time.Second * 30) + timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*30) + defer cancel() Loop: for { select { - case <-timeout: + case <-timeoutCtx.Done(): t.Fatal("timeout testing") default: jobNumRet, err := in.GetJobStats(ctx, &indexpb.GetJobStatsRequest{}) diff --git a/internal/indexnode/taskinfo_ops.go b/internal/indexnode/taskinfo_ops.go index ba1d1cfac5..15440a313b 100644 --- a/internal/indexnode/taskinfo_ops.go +++ b/internal/indexnode/taskinfo_ops.go @@ -1,6 +1,7 @@ package indexnode import ( + "context" "time" "github.com/golang/protobuf/proto" @@ -110,15 +111,18 @@ func (i *IndexNode) waitTaskFinish() { } gracefulTimeout := Params.IndexNodeCfg.GracefulStopTimeout - timer := time.NewTimer(gracefulTimeout.GetAsDuration(time.Second)) + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + timeoutCtx, cancel := context.WithTimeout(i.loopCtx, gracefulTimeout.GetAsDuration(time.Second)) + defer cancel() for { select { - case <-time.Tick(time.Second): + case <-ticker.C: if !i.hasInProgressTask() { return } - case <-timer.C: + case <-timeoutCtx.Done(): log.Warn("timeout, the index node has some progress task") for _, info := range i.tasks { if info.state == commonpb.IndexState_InProgress { diff --git a/internal/mq/mqimpl/rocksmq/client/client_impl_test.go b/internal/mq/mqimpl/rocksmq/client/client_impl_test.go index 7ec4411b43..845590f02a 100644 --- a/internal/mq/mqimpl/rocksmq/client/client_impl_test.go +++ b/internal/mq/mqimpl/rocksmq/client/client_impl_test.go @@ -185,12 +185,14 @@ func TestClient_SeekLatest(t *testing.T) { msgChan = consumer2.Chan() loop := true + ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() for loop { select { case msg := <-msgChan: assert.Equal(t, len(msg.Payload), 8) loop = false - case <-time.After(2 * time.Second): + case <-ticker.C: msg := &ProducerMessage{ Payload: make([]byte, 8), } diff --git a/internal/mq/msgdispatcher/manager_test.go b/internal/mq/msgdispatcher/manager_test.go index ec95712422..2cac99330c 100644 --- a/internal/mq/msgdispatcher/manager_test.go +++ b/internal/mq/msgdispatcher/manager_test.go @@ -203,11 +203,11 @@ func (suite *SimulationSuite) produceMsg(wg *sync.WaitGroup) { func (suite *SimulationSuite) consumeMsg(ctx context.Context, wg *sync.WaitGroup, vchannel string) { defer wg.Done() var lastTs typeutil.Timestamp + timeoutCtx, cancel := context.WithTimeout(ctx, 5000*time.Millisecond) + defer cancel() for { select { - case <-ctx.Done(): - return - case <-time.After(5000 * time.Millisecond): // no message to consume + case <-timeoutCtx.Done(): return case pack := <-suite.vchannels[vchannel].output: assert.Greater(suite.T(), pack.EndTs, lastTs) @@ -231,11 +231,13 @@ func (suite *SimulationSuite) consumeMsg(ctx context.Context, wg *sync.WaitGroup func (suite *SimulationSuite) produceTimeTickOnly(ctx context.Context) { var tt = 1 + ticker := time.NewTicker(10 * time.Millisecond) + defer ticker.Stop() for { select { case <-ctx.Done(): return - case <-time.After(10 * time.Millisecond): + case <-ticker.C: ts := uint64(tt * 1000) err := suite.producer.Produce(&msgstream.MsgPack{ Msgs: []msgstream.TsMsg{genTimeTickMsg(ts)}, diff --git a/internal/mq/msgdispatcher/mock_test.go b/internal/mq/msgdispatcher/mock_test.go index 33fdb527e1..b83e8f7cd4 100644 --- a/internal/mq/msgdispatcher/mock_test.go +++ b/internal/mq/msgdispatcher/mock_test.go @@ -58,9 +58,11 @@ func getSeekPositions(factory msgstream.Factory, pchannel string, maxNum int) ([ defer stream.Close() stream.AsConsumer([]string{pchannel}, fmt.Sprintf("%d", rand.Int()), mqwrapper.SubscriptionPositionEarliest) positions := make([]*msgstream.MsgPosition, 0) + timeoutCtx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() for { select { - case <-time.After(100 * time.Millisecond): // no message to consume + case <-timeoutCtx.Done(): // no message to consume return positions, nil case pack := <-stream.Chan(): positions = append(positions, pack.EndPositions[0]) diff --git a/internal/mq/msgstream/mqwrapper/pulsar/pulsar_client_test.go b/internal/mq/msgstream/mqwrapper/pulsar/pulsar_client_test.go index d124812831..d61ffbd1cb 100644 --- a/internal/mq/msgstream/mqwrapper/pulsar/pulsar_client_test.go +++ b/internal/mq/msgstream/mqwrapper/pulsar/pulsar_client_test.go @@ -526,7 +526,8 @@ func TestPulsarClient_SeekLatest(t *testing.T) { defer consumer.Close() msgChan := consumer.Chan() - + ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() loop := true for loop { select { @@ -536,7 +537,7 @@ func TestPulsarClient_SeekLatest(t *testing.T) { log.Info("RECV", zap.Any("v", v)) assert.Equal(t, v, 4) loop = false - case <-time.After(2 * time.Second): + case <-ticker.C: log.Info("after 2 seconds") msg := &mqwrapper.ProducerMessage{ Payload: IntToBytes(4), diff --git a/internal/proxy/channels_time_ticker_test.go b/internal/proxy/channels_time_ticker_test.go index 60958360d5..573b7e21e5 100644 --- a/internal/proxy/channels_time_ticker_test.go +++ b/internal/proxy/channels_time_ticker_test.go @@ -107,8 +107,8 @@ func TestChannelsTimeTickerImpl_getLastTick(t *testing.T) { tso := newMockTsoAllocator() ctx := context.Background() - ticker := newChannelsTimeTicker(ctx, interval, pchans, newGetStatisticsFunc(pchans), tso) - err := ticker.start() + channelTicker := newChannelsTimeTicker(ctx, interval, pchans, newGetStatisticsFunc(pchans), tso) + err := channelTicker.start() assert.Equal(t, nil, err) var wg sync.WaitGroup @@ -116,14 +116,15 @@ func TestChannelsTimeTickerImpl_getLastTick(t *testing.T) { b := make(chan struct{}, 1) go func() { defer wg.Done() - timer := time.NewTicker(interval * 40) + ticker := time.NewTicker(interval * 40) + defer ticker.Stop() for { select { case <-b: return - case <-timer.C: + case <-ticker.C: for _, pchan := range pchans { - ts, err := ticker.getLastTick(pchan) + ts, err := channelTicker.getLastTick(pchan) assert.Equal(t, nil, err) log.Debug("TestChannelsTimeTickerImpl_getLastTick", zap.Any("pchan", pchan), @@ -137,7 +138,7 @@ func TestChannelsTimeTickerImpl_getLastTick(t *testing.T) { wg.Wait() defer func() { - err := ticker.close() + err := channelTicker.close() assert.Equal(t, nil, err) }() @@ -154,8 +155,8 @@ func TestChannelsTimeTickerImpl_getMinTsStatistics(t *testing.T) { tso := newMockTsoAllocator() ctx := context.Background() - ticker := newChannelsTimeTicker(ctx, interval, pchans, newGetStatisticsFunc(pchans), tso) - err := ticker.start() + channelTicker := newChannelsTimeTicker(ctx, interval, pchans, newGetStatisticsFunc(pchans), tso) + err := channelTicker.start() assert.Equal(t, nil, err) var wg sync.WaitGroup @@ -163,13 +164,14 @@ func TestChannelsTimeTickerImpl_getMinTsStatistics(t *testing.T) { b := make(chan struct{}, 1) go func() { defer wg.Done() - timer := time.NewTicker(interval * 40) + ticker := time.NewTicker(interval * 40) + defer ticker.Stop() for { select { case <-b: return - case <-timer.C: - stats, _, err := ticker.getMinTsStatistics() + case <-ticker.C: + stats, _, err := channelTicker.getMinTsStatistics() assert.Equal(t, nil, err) for pchan, ts := range stats { log.Debug("TestChannelsTimeTickerImpl_getLastTick", @@ -184,7 +186,7 @@ func TestChannelsTimeTickerImpl_getMinTsStatistics(t *testing.T) { wg.Wait() defer func() { - err := ticker.close() + err := channelTicker.close() assert.Equal(t, nil, err) }() @@ -201,8 +203,8 @@ func TestChannelsTimeTickerImpl_getMinTick(t *testing.T) { tso := newMockTsoAllocator() ctx := context.Background() - ticker := newChannelsTimeTicker(ctx, interval, pchans, newGetStatisticsFunc(pchans), tso) - err := ticker.start() + channelTicker := newChannelsTimeTicker(ctx, interval, pchans, newGetStatisticsFunc(pchans), tso) + err := channelTicker.start() assert.Equal(t, nil, err) var wg sync.WaitGroup @@ -211,13 +213,14 @@ func TestChannelsTimeTickerImpl_getMinTick(t *testing.T) { ts := typeutil.ZeroTimestamp go func() { defer wg.Done() - timer := time.NewTicker(interval * 40) + ticker := time.NewTicker(interval * 40) + defer ticker.Stop() for { select { case <-b: return - case <-timer.C: - minTs := ticker.getMinTick() + case <-ticker.C: + minTs := channelTicker.getMinTick() assert.GreaterOrEqual(t, minTs, ts) } } @@ -227,7 +230,7 @@ func TestChannelsTimeTickerImpl_getMinTick(t *testing.T) { wg.Wait() defer func() { - err := ticker.close() + err := channelTicker.close() assert.Equal(t, nil, err) }() diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 3eaa79de0c..fae55cc9a1 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -268,14 +268,14 @@ func (node *Proxy) sendChannelsTimeTickLoop() { go func() { defer node.wg.Done() - timer := time.NewTicker(Params.ProxyCfg.TimeTickInterval.GetAsDuration(time.Millisecond)) - + ticker := time.NewTicker(Params.ProxyCfg.TimeTickInterval.GetAsDuration(time.Millisecond)) + defer ticker.Stop() for { select { case <-node.ctx.Done(): log.Info("send channels time tick loop exit") return - case <-timer.C: + case <-ticker.C: stats, ts, err := node.chTicker.getMinTsStatistics() if err != nil { log.Warn("sendChannelsTimeTickLoop.getMinTsStatistics", zap.Error(err)) diff --git a/internal/querycoordv2/dist/dist_handler.go b/internal/querycoordv2/dist/dist_handler.go index 5ea80227a6..31bfdf5319 100644 --- a/internal/querycoordv2/dist/dist_handler.go +++ b/internal/querycoordv2/dist/dist_handler.go @@ -58,6 +58,7 @@ func (dh *distHandler) start(ctx context.Context) { logger := log.Ctx(ctx).With(zap.Int64("nodeID", dh.nodeID)).WithRateGroup("qnv2.distHandler", 1, 60) logger.Info("start dist handler") ticker := time.NewTicker(Params.QueryCoordCfg.DistPullInterval.GetAsDuration(time.Millisecond)) + defer ticker.Stop() failures := 0 for { select { diff --git a/internal/querycoordv2/job/scheduler.go b/internal/querycoordv2/job/scheduler.go index ec3606dfd1..64c8a1ced4 100644 --- a/internal/querycoordv2/job/scheduler.go +++ b/internal/querycoordv2/job/scheduler.go @@ -71,6 +71,7 @@ func (scheduler *Scheduler) schedule(ctx context.Context) { go func() { defer scheduler.wg.Done() ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() for { select { case <-ctx.Done(): diff --git a/internal/querycoordv2/observers/leader_observer.go b/internal/querycoordv2/observers/leader_observer.go index 292164cb27..b8ef22d996 100644 --- a/internal/querycoordv2/observers/leader_observer.go +++ b/internal/querycoordv2/observers/leader_observer.go @@ -50,6 +50,7 @@ func (o *LeaderObserver) Start(ctx context.Context) { go func() { defer o.wg.Done() ticker := time.NewTicker(interval) + defer ticker.Stop() for { select { case <-o.closeCh: diff --git a/internal/querycoordv2/observers/replica_observer.go b/internal/querycoordv2/observers/replica_observer.go index 1e099319b2..0e77176b7d 100644 --- a/internal/querycoordv2/observers/replica_observer.go +++ b/internal/querycoordv2/observers/replica_observer.go @@ -63,6 +63,7 @@ func (ob *ReplicaObserver) schedule(ctx context.Context) { log.Info("Start check replica loop") ticker := time.NewTicker(params.Params.QueryCoordCfg.CheckNodeInReplicaInterval.GetAsDuration(time.Second)) + defer ticker.Stop() for { select { case <-ctx.Done(): diff --git a/internal/querycoordv2/observers/resource_observer.go b/internal/querycoordv2/observers/resource_observer.go index 7b3a775852..e11be4f566 100644 --- a/internal/querycoordv2/observers/resource_observer.go +++ b/internal/querycoordv2/observers/resource_observer.go @@ -61,6 +61,7 @@ func (ob *ResourceObserver) schedule(ctx context.Context) { log.Info("Start check resource group loop") ticker := time.NewTicker(params.Params.QueryCoordCfg.CheckResourceGroupInterval.GetAsDuration(time.Second)) + defer ticker.Stop() for { select { case <-ctx.Done(): diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index c991d5e313..f3bdd0c4b7 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -89,6 +89,7 @@ func (ob *TargetObserver) schedule(ctx context.Context) { log.Info("Start update next target loop") ticker := time.NewTicker(params.Params.QueryCoordCfg.UpdateNextTargetInterval.GetAsDuration(time.Second)) + defer ticker.Stop() for { select { case <-ctx.Done(): diff --git a/internal/querycoordv2/session/cluster.go b/internal/querycoordv2/session/cluster.go index 8833045792..ccbadef947 100644 --- a/internal/querycoordv2/session/cluster.go +++ b/internal/querycoordv2/session/cluster.go @@ -100,6 +100,7 @@ func (c *QueryCluster) Stop() { func (c *QueryCluster) updateLoop() { defer c.wg.Done() ticker := time.NewTicker(updateTickerDuration) + defer ticker.Stop() for { select { case <-c.ch: diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index a23cb7f195..0604a1dd26 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -215,6 +215,7 @@ func (c *Core) sendMinDdlTsAsTt() { func (c *Core) startTimeTickLoop() { defer c.wg.Done() ticker := time.NewTicker(Params.ProxyCfg.TimeTickInterval.GetAsDuration(time.Millisecond)) + defer ticker.Stop() for { select { case <-c.ctx.Done(): diff --git a/internal/util/funcutil/func.go b/internal/util/funcutil/func.go index 1e591a5eda..d2fa9d20ab 100644 --- a/internal/util/funcutil/func.go +++ b/internal/util/funcutil/func.go @@ -44,8 +44,10 @@ import ( // CheckGrpcReady wait for context timeout, or wait 100ms then send nil to targetCh func CheckGrpcReady(ctx context.Context, targetCh chan error) { + timer := time.NewTimer(100 * time.Millisecond) + defer timer.Stop() select { - case <-time.After(100 * time.Millisecond): + case <-timer.C: targetCh <- nil case <-ctx.Done(): return diff --git a/internal/util/sessionutil/session_util_test.go b/internal/util/sessionutil/session_util_test.go index 5c533848c8..09d97ff26b 100644 --- a/internal/util/sessionutil/session_util_test.go +++ b/internal/util/sessionutil/session_util_test.go @@ -597,12 +597,10 @@ func (suite *SessionWithVersionSuite) TestWatchServicesWithVersionRange() { } }() - t := time.NewTimer(time.Second) - defer t.Stop() select { case evt := <-ch: suite.Equal(suite.sessions[1].ServerID, evt.Session.ServerID) - case <-t.C: + case <-time.After(time.Second): suite.Fail("no event received, failing") } })