diff --git a/internal/querycoordv2/dist/dist_handler.go b/internal/querycoordv2/dist/dist_handler.go
index 14dff255d9..0056bcffcf 100644
--- a/internal/querycoordv2/dist/dist_handler.go
+++ b/internal/querycoordv2/dist/dist_handler.go
@@ -24,6 +24,7 @@ import (
 
 	"github.com/samber/lo"
 	"go.uber.org/zap"
+	"google.golang.org/protobuf/proto"
 
 	"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
 	"github.com/milvus-io/milvus/internal/querycoordv2/meta"
@@ -37,6 +38,7 @@ import (
 	"github.com/milvus-io/milvus/pkg/util/commonpbutil"
 	"github.com/milvus-io/milvus/pkg/util/merr"
 	"github.com/milvus-io/milvus/pkg/util/paramtable"
+	"github.com/milvus-io/milvus/pkg/util/timerecord"
 	"github.com/milvus-io/milvus/pkg/util/typeutil"
 )
 
@@ -91,7 +93,9 @@ func (dh *distHandler) start(ctx context.Context) {
 }
 
 func (dh *distHandler) pullDist(ctx context.Context, failures *int, dispatchTask bool) {
+	tr := timerecord.NewTimeRecorder("")
 	resp, err := dh.getDistribution(ctx)
+	d1 := tr.RecordSpan()
 	if err != nil {
 		node := dh.nodeManager.Get(dh.nodeID)
 		*failures = *failures + 1
@@ -100,11 +104,15 @@ func (dh *distHandler) pullDist(ctx context.Context, failures *int, dispatchTask
 			fields = append(fields, zap.Time("lastHeartbeat", node.LastHeartbeat()))
 		}
 		fields = append(fields, zap.Error(err))
-		log.RatedWarn(30.0, "failed to get data distribution", fields...)
+		log.Ctx(ctx).WithRateGroup("distHandler.pullDist", 1, 60).
+			RatedWarn(30.0, "failed to get data distribution", fields...)
 	} else {
 		*failures = 0
 		dh.handleDistResp(ctx, resp, dispatchTask)
 	}
+	log.Ctx(ctx).WithRateGroup("distHandler.pullDist", 1, 120).
+		RatedInfo(120.0, "pull and handle distribution done",
+			zap.Int("respSize", proto.Size(resp)), zap.Duration("pullDur", d1), zap.Duration("handleDur", tr.RecordSpan()))
 }
 
 func (dh *distHandler) handleDistResp(ctx context.Context, resp *querypb.GetDataDistributionResponse, dispatchTask bool) {
diff --git a/internal/querycoordv2/meta/target.go b/internal/querycoordv2/meta/target.go
index 27eae08be3..224fd0c77a 100644
--- a/internal/querycoordv2/meta/target.go
+++ b/internal/querycoordv2/meta/target.go
@@ -26,43 +26,70 @@ import (
 	"github.com/milvus-io/milvus/pkg/log"
 	"github.com/milvus-io/milvus/pkg/proto/datapb"
 	"github.com/milvus-io/milvus/pkg/proto/querypb"
+	"github.com/milvus-io/milvus/pkg/util/lock"
 	"github.com/milvus-io/milvus/pkg/util/metricsinfo"
 	"github.com/milvus-io/milvus/pkg/util/typeutil"
 )
 
 // CollectionTarget collection target is immutable,
 type CollectionTarget struct {
-	segments   map[int64]*datapb.SegmentInfo
-	dmChannels map[string]*DmChannel
-	partitions typeutil.Set[int64] // stores target partitions info
-	version    int64
+	segments           map[int64]*datapb.SegmentInfo
+	channel2Segments   map[string][]*datapb.SegmentInfo
+	partition2Segments map[int64][]*datapb.SegmentInfo
+	dmChannels         map[string]*DmChannel
+	partitions         typeutil.Set[int64] // stores target partitions info
+	version            int64
 
 	// record target status, if target has been save before milvus v2.4.19, then the target will lack of segment info.
 	lackSegmentInfo bool
 }
 
 func NewCollectionTarget(segments map[int64]*datapb.SegmentInfo, dmChannels map[string]*DmChannel, partitionIDs []int64) *CollectionTarget {
+	channel2Segments := make(map[string][]*datapb.SegmentInfo, len(dmChannels))
+	partition2Segments := make(map[int64][]*datapb.SegmentInfo, len(partitionIDs))
+	for _, segment := range segments {
+		channel := segment.GetInsertChannel()
+		if _, ok := channel2Segments[channel]; !ok {
+			channel2Segments[channel] = make([]*datapb.SegmentInfo, 0)
+		}
+		channel2Segments[channel] = append(channel2Segments[channel], segment)
+		partitionID := segment.GetPartitionID()
+		if _, ok := partition2Segments[partitionID]; !ok {
+			partition2Segments[partitionID] = make([]*datapb.SegmentInfo, 0)
+		}
+		partition2Segments[partitionID] = append(partition2Segments[partitionID], segment)
+	}
 	return &CollectionTarget{
-		segments:   segments,
-		dmChannels: dmChannels,
-		partitions: typeutil.NewSet(partitionIDs...),
-		version:    time.Now().UnixNano(),
+		segments:           segments,
+		channel2Segments:   channel2Segments,
+		partition2Segments: partition2Segments,
+		dmChannels:         dmChannels,
+		partitions:         typeutil.NewSet(partitionIDs...),
+		version:            time.Now().UnixNano(),
 	}
 }
 
 func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget {
 	segments := make(map[int64]*datapb.SegmentInfo)
 	dmChannels := make(map[string]*DmChannel)
+	channel2Segments := make(map[string][]*datapb.SegmentInfo)
+	partition2Segments := make(map[int64][]*datapb.SegmentInfo)
 	var partitions []int64
 
 	lackSegmentInfo := false
 	for _, t := range target.GetChannelTargets() {
+		if _, ok := channel2Segments[t.GetChannelName()]; !ok {
+			channel2Segments[t.GetChannelName()] = make([]*datapb.SegmentInfo, 0)
+		}
 		for _, partition := range t.GetPartitionTargets() {
+			if _, ok := partition2Segments[partition.GetPartitionID()]; !ok {
+				partition2Segments[partition.GetPartitionID()] = make([]*datapb.SegmentInfo, 0, len(partition.GetSegments()))
+			}
 			for _, segment := range partition.GetSegments() {
 				if segment.GetNumOfRows() <= 0 {
 					lackSegmentInfo = true
 				}
-				segments[segment.GetID()] = &datapb.SegmentInfo{
+				info := &datapb.SegmentInfo{
 					ID:            segment.GetID(),
 					Level:         segment.GetLevel(),
 					CollectionID:  target.GetCollectionID(),
@@ -70,6 +97,9 @@ func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget
 					InsertChannel: t.GetChannelName(),
 					NumOfRows:     segment.GetNumOfRows(),
 				}
+				segments[segment.GetID()] = info
+				channel2Segments[t.GetChannelName()] = append(channel2Segments[t.GetChannelName()], info)
+				partition2Segments[partition.GetPartitionID()] = append(partition2Segments[partition.GetPartitionID()], info)
 			}
 			partitions = append(partitions, partition.GetPartitionID())
 		}
@@ -90,11 +120,13 @@ func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget
 	}
 
 	return &CollectionTarget{
-		segments:        segments,
-		dmChannels:      dmChannels,
-		partitions:      typeutil.NewSet(partitions...),
-		version:         target.GetVersion(),
-		lackSegmentInfo: lackSegmentInfo,
+		segments:           segments,
+		channel2Segments:   channel2Segments,
+		partition2Segments: partition2Segments,
+		dmChannels:         dmChannels,
+		partitions:         typeutil.NewSet(partitions...),
+		version:            target.GetVersion(),
+		lackSegmentInfo:    lackSegmentInfo,
 	}
 }
 
@@ -155,6 +187,14 @@ func (p *CollectionTarget) GetAllSegments() map[int64]*datapb.SegmentInfo {
 	return p.segments
 }
 
+func (p *CollectionTarget) GetChannelSegments(channel string) []*datapb.SegmentInfo {
+	return p.channel2Segments[channel]
+}
+
+func (p *CollectionTarget) GetPartitionSegments(partitionID int64) []*datapb.SegmentInfo {
+	return p.partition2Segments[partitionID]
+}
+
 func (p *CollectionTarget) GetTargetVersion() int64 {
 	return p.version
 }
@@ -181,39 +221,43 @@ func (p *CollectionTarget) Ready() bool {
 }
 
 type target struct {
+	keyLock *lock.KeyLock[int64] // guards updateCollectionTarget
 	// just maintain target at collection level
-	collectionTargetMap map[int64]*CollectionTarget
+	collectionTargetMap *typeutil.ConcurrentMap[int64, *CollectionTarget]
 }
 
 func newTarget() *target {
 	return &target{
-		collectionTargetMap: make(map[int64]*CollectionTarget),
+		keyLock:             lock.NewKeyLock[int64](),
+		collectionTargetMap: typeutil.NewConcurrentMap[int64, *CollectionTarget](),
 	}
 }
 
 func (t *target) updateCollectionTarget(collectionID int64, target *CollectionTarget) {
-	if t.collectionTargetMap[collectionID] != nil && target.GetTargetVersion() <= t.collectionTargetMap[collectionID].GetTargetVersion() {
+	t.keyLock.Lock(collectionID)
+	defer t.keyLock.Unlock(collectionID)
+	if old, ok := t.collectionTargetMap.Get(collectionID); ok && old != nil && target.GetTargetVersion() <= old.GetTargetVersion() {
 		return
 	}
 
-	t.collectionTargetMap[collectionID] = target
+	t.collectionTargetMap.Insert(collectionID, target)
 }
 
 func (t *target) removeCollectionTarget(collectionID int64) {
-	delete(t.collectionTargetMap, collectionID)
+	t.collectionTargetMap.Remove(collectionID)
 }
 
 func (t *target) getCollectionTarget(collectionID int64) *CollectionTarget {
-	return t.collectionTargetMap[collectionID]
+	ret, _ := t.collectionTargetMap.Get(collectionID)
+	return ret
 }
 
 func (t *target) toQueryCoordCollectionTargets(collectionID int64) []*metricsinfo.QueryCoordTarget {
-	var ret []*metricsinfo.QueryCoordTarget
-	for k, v := range t.collectionTargetMap {
+	targets := make([]*metricsinfo.QueryCoordTarget, 0, t.collectionTargetMap.Len())
+	t.collectionTargetMap.Range(func(k int64, v *CollectionTarget) bool {
 		if collectionID > 0 && collectionID != k {
-			continue
+			return true
 		}
-
 		segments := lo.MapToSlice(v.GetAllSegments(), func(k int64, s *datapb.SegmentInfo) *metricsinfo.Segment {
 			return metrics.NewSegmentFrom(s)
 		})
@@ -222,12 +266,13 @@ func (t *target) toQueryCoordCollectionTargets(collectionID int64) []*metricsinf
 			return metrics.NewDMChannelFrom(ch.VchannelInfo)
 		})
 
-		ret = append(ret, &metricsinfo.QueryCoordTarget{
+		qct := &metricsinfo.QueryCoordTarget{
 			CollectionID: k,
 			Segments:     segments,
 			DMChannels:   dmChannels,
-		})
-	}
-
-	return ret
+		}
+		targets = append(targets, qct)
+		return true
+	})
+	return targets
 }
diff --git a/internal/querycoordv2/meta/target_manager.go b/internal/querycoordv2/meta/target_manager.go
index d76ac1f41f..8213ee869c 100644
--- a/internal/querycoordv2/meta/target_manager.go
+++ b/internal/querycoordv2/meta/target_manager.go
@@ -76,9 +76,8 @@ type TargetManagerInterface interface {
 }
 
 type TargetManager struct {
-	rwMutex sync.RWMutex
-	broker  Broker
-	meta    *Meta
+	broker Broker
+	meta   *Meta
 
 	// all read segment/channel operation happens on current -> only current target are visible to outer
 	// all add segment/channel operation happens on next -> changes can only happen on next target
@@ -100,8 +99,6 @@ func NewTargetManager(broker Broker, meta *Meta) *TargetManager {
 // WARN: DO NOT call this method for an existing collection as target observer running, or it will lead to a double-update,
 // which may make the current target not available
 func (mgr *TargetManager) UpdateCollectionCurrentTarget(ctx context.Context, collectionID int64) bool {
-	mgr.rwMutex.Lock()
-	defer mgr.rwMutex.Unlock()
 	log := log.With(zap.Int64("collectionID", collectionID))
 
 	log.Debug("start to update current target for collection")
@@ -157,8 +154,6 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(ctx context.Context, collec
 		return err
 	}
 
-	mgr.rwMutex.Lock()
-	defer mgr.rwMutex.Unlock()
 	partitions := mgr.meta.GetPartitionsByCollection(ctx, collectionID)
 	partitionIDs := lo.Map(partitions, func(partition *Partition, i int) int64 {
 		return partition.PartitionID
@@ -188,7 +183,7 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(ctx context.Context, collec
 	}
 
 	for _, infos := range channelInfos {
-		merged := mgr.mergeDmChannelInfo(infos)
+		merged := mergeDmChannelInfo(infos)
 		dmChannels[merged.GetChannelName()] = merged
 	}
 
@@ -198,7 +193,9 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(ctx context.Context, collec
 	}
 
 	allocatedTarget := NewCollectionTarget(segments, dmChannels, partitionIDs)
+
 	mgr.next.updateCollectionTarget(collectionID, allocatedTarget)
+
 	log.Debug("finish to update next targets for collection",
 		zap.Int64("collectionID", collectionID),
 		zap.Int64s("PartitionIDs", partitionIDs))
@@ -206,7 +203,7 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(ctx context.Context, collec
 	return nil
 }
 
-func (mgr *TargetManager) mergeDmChannelInfo(infos []*datapb.VchannelInfo) *DmChannel {
+func mergeDmChannelInfo(infos []*datapb.VchannelInfo) *DmChannel {
 	var dmChannel *DmChannel
 
 	for _, info := range infos {
@@ -228,8 +225,6 @@ func (mgr *TargetManager) mergeDmChannelInfo(infos []*datapb.VchannelInfo) *DmCh
 
 // RemoveCollection removes all channels and segments in the given collection
 func (mgr *TargetManager) RemoveCollection(ctx context.Context, collectionID int64) {
-	mgr.rwMutex.Lock()
-	defer mgr.rwMutex.Unlock()
 	log.Info("remove collection from targets",
 		zap.Int64("collectionID", collectionID))
 
@@ -250,9 +245,6 @@ func (mgr *TargetManager) RemoveCollection(ctx context.Context, collectionID int
 // RemovePartition removes all segment in the given partition,
 // NOTE: this doesn't remove any channel even the given one is the only partition
 func (mgr *TargetManager) RemovePartition(ctx context.Context, collectionID int64, partitionIDs ...int64) {
-	mgr.rwMutex.Lock()
-	defer mgr.rwMutex.Unlock()
-
 	log := log.With(zap.Int64("collectionID", collectionID),
 		zap.Int64s("PartitionIDs", partitionIDs))
 
@@ -359,9 +351,6 @@ func (mgr *TargetManager) getCollectionTarget(scope TargetScope, collectionID in
 func (mgr *TargetManager) GetGrowingSegmentsByCollection(ctx context.Context, collectionID int64,
 	scope TargetScope,
 ) typeutil.UniqueSet {
-	mgr.rwMutex.RLock()
-	defer mgr.rwMutex.RUnlock()
-
 	targets := mgr.getCollectionTarget(scope, collectionID)
 
 	for _, t := range targets {
@@ -382,9 +371,6 @@ func (mgr *TargetManager) GetGrowingSegmentsByChannel(ctx context.Context, colle
 	channelName string,
 	scope TargetScope,
 ) typeutil.UniqueSet {
-	mgr.rwMutex.RLock()
-	defer mgr.rwMutex.RUnlock()
-
 	targets := mgr.getCollectionTarget(scope, collectionID)
 	for _, t := range targets {
 		segments := typeutil.NewUniqueSet()
@@ -405,9 +391,6 @@ func (mgr *TargetManager) GetGrowingSegmentsByChannel(ctx context.Context, colle
 func (mgr *TargetManager) GetSealedSegmentsByCollection(ctx context.Context, collectionID int64,
 	scope TargetScope,
 ) map[int64]*datapb.SegmentInfo {
-	mgr.rwMutex.RLock()
-	defer mgr.rwMutex.RUnlock()
-
 	targets := mgr.getCollectionTarget(scope, collectionID)
 
 	for _, t := range targets {
@@ -421,17 +404,11 @@ func (mgr *TargetManager) GetSealedSegmentsByChannel(ctx context.Context, collec
 	channelName string,
 	scope TargetScope,
 ) map[int64]*datapb.SegmentInfo {
-	mgr.rwMutex.RLock()
-	defer mgr.rwMutex.RUnlock()
-
 	targets := mgr.getCollectionTarget(scope, collectionID)
 	for _, t := range targets {
-		ret := make(map[int64]*datapb.SegmentInfo)
-		for k, v := range t.GetAllSegments() {
-			if v.GetInsertChannel() == channelName {
-				ret[k] = v
-			}
-		}
+		ret := lo.KeyBy(t.GetChannelSegments(channelName), func(s *datapb.SegmentInfo) int64 {
+			return s.GetID()
+		})
 
 		if len(ret) > 0 {
 			return ret
@@ -445,9 +422,6 @@ func (mgr *TargetManager) GetDroppedSegmentsByChannel(ctx context.Context, colle
 	channelName string,
 	scope TargetScope,
 ) []int64 {
-	mgr.rwMutex.RLock()
-	defer mgr.rwMutex.RUnlock()
-
 	targets := mgr.getCollectionTarget(scope, collectionID)
 	for _, t := range targets {
 		if channel, ok := t.dmChannels[channelName]; ok {
@@ -462,16 +436,11 @@ func (mgr *TargetManager) GetSealedSegmentsByPartition(ctx context.Context, coll
 	partitionID int64,
 	scope TargetScope,
 ) map[int64]*datapb.SegmentInfo {
-	mgr.rwMutex.RLock()
-	defer mgr.rwMutex.RUnlock()
-
 	targets := mgr.getCollectionTarget(scope, collectionID)
 	for _, t := range targets {
 		segments := make(map[int64]*datapb.SegmentInfo)
-		for _, s := range t.GetAllSegments() {
-			if s.GetPartitionID() == partitionID {
-				segments[s.GetID()] = s
-			}
+		for _, s := range t.GetPartitionSegments(partitionID) {
+			segments[s.GetID()] = s
 		}
 
 		if len(segments) > 0 {
@@ -483,9 +452,6 @@ func (mgr *TargetManager) GetSealedSegmentsByPartition(ctx context.Context, coll
 }
 
 func (mgr *TargetManager) GetDmChannelsByCollection(ctx context.Context, collectionID int64, scope TargetScope) map[string]*DmChannel {
-	mgr.rwMutex.RLock()
-	defer mgr.rwMutex.RUnlock()
-
 	targets := mgr.getCollectionTarget(scope, collectionID)
 
 	for _, t := range targets {
@@ -496,9 +462,6 @@ func (mgr *TargetManager) GetDmChannelsByCollection(ctx context.Context, collect
 }
 
 func (mgr *TargetManager) GetDmChannel(ctx context.Context, collectionID int64, channel string, scope TargetScope) *DmChannel {
-	mgr.rwMutex.RLock()
-	defer mgr.rwMutex.RUnlock()
-
 	targets := mgr.getCollectionTarget(scope, collectionID)
 	for _, t := range targets {
 		if ch, ok := t.GetAllDmChannels()[channel]; ok {
@@ -509,9 +472,6 @@ func (mgr *TargetManager) GetDmChannel(ctx context.Context, collectionID int64,
 }
 
 func (mgr *TargetManager) GetSealedSegment(ctx context.Context, collectionID int64, id int64, scope TargetScope) *datapb.SegmentInfo {
-	mgr.rwMutex.RLock()
-	defer mgr.rwMutex.RUnlock()
-
 	targets := mgr.getCollectionTarget(scope, collectionID)
 	for _, t := range targets {
 		if s, ok := t.GetAllSegments()[id]; ok {
@@ -523,9 +483,6 @@ func (mgr *TargetManager) GetSealedSegment(ctx context.Context, collectionID int
 }
 
 func (mgr *TargetManager) GetCollectionTargetVersion(ctx context.Context, collectionID int64, scope TargetScope) int64 {
-	mgr.rwMutex.RLock()
-	defer mgr.rwMutex.RUnlock()
-
 	targets := mgr.getCollectionTarget(scope, collectionID)
 	for _, t := range targets {
 		if t.GetTargetVersion() > 0 {
@@ -537,9 +494,6 @@ func (mgr *TargetManager) GetCollectionTargetVersion(ctx context.Context, collec
 }
 
 func (mgr *TargetManager) IsCurrentTargetExist(ctx context.Context, collectionID int64, partitionID int64) bool {
-	mgr.rwMutex.RLock()
-	defer mgr.rwMutex.RUnlock()
-
 	targets := mgr.getCollectionTarget(CurrentTarget, collectionID)
 
 	return len(targets) > 0 && (targets[0].partitions.Contain(partitionID) || partitionID == common.AllPartitionsID) && len(targets[0].dmChannels) > 0
@@ -552,8 +506,6 @@ func (mgr *TargetManager) IsNextTargetExist(ctx context.Context, collectionID in
 }
 
 func (mgr *TargetManager) SaveCurrentTarget(ctx context.Context, catalog metastore.QueryCoordCatalog) {
-	mgr.rwMutex.Lock()
-	defer mgr.rwMutex.Unlock()
 	if mgr.current != nil {
 		// use pool here to control maximal writer used by save target
 		pool := conc.NewPool[any](runtime.GOMAXPROCS(0) * 2)
@@ -577,13 +529,14 @@ func (mgr *TargetManager) SaveCurrentTarget(ctx context.Context, catalog metasto
 			})
 		}
 		tasks := make([]typeutil.Pair[int64, *querypb.CollectionTarget], 0, batchSize)
-		for id, target := range mgr.current.collectionTargetMap {
+		mgr.current.collectionTargetMap.Range(func(id int64, target *CollectionTarget) bool {
 			tasks = append(tasks, typeutil.NewPair(id, target.toPbMsg()))
 			if len(tasks) >= batchSize {
 				submit(tasks)
 				tasks = make([]typeutil.Pair[int64, *querypb.CollectionTarget], 0, batchSize)
 			}
-		}
+			return true
+		})
 		if len(tasks) > 0 {
 			submit(tasks)
 		}
@@ -592,9 +545,6 @@ func (mgr *TargetManager) SaveCurrentTarget(ctx context.Context, catalog metasto
 }
 
 func (mgr *TargetManager) Recover(ctx context.Context, catalog metastore.QueryCoordCatalog) error {
-	mgr.rwMutex.Lock()
-	defer mgr.rwMutex.Unlock()
-
 	targets, err := catalog.GetCollectionTargets(ctx)
 	if err != nil {
 		log.Warn("failed to recover collection target from etcd", zap.Error(err))
@@ -623,8 +573,6 @@ func (mgr *TargetManager) Recover(ctx context.Context, catalog metastore.QueryCo
 
 // if segment isn't l0 segment, and exist in current/next target, then it can be moved
 func (mgr *TargetManager) CanSegmentBeMoved(ctx context.Context, collectionID, segmentID int64) bool {
-	mgr.rwMutex.Lock()
-	defer mgr.rwMutex.Unlock()
 	current := mgr.current.getCollectionTarget(collectionID)
 	if current != nil && current.segments[segmentID] != nil && current.segments[segmentID].GetLevel() != datapb.SegmentLevel_L0 {
 		return true
@@ -639,9 +587,6 @@ func (mgr *TargetManager) CanSegmentBeMoved(ctx context.Context, collectionID, s
 }
 
 func (mgr *TargetManager) GetTargetJSON(ctx context.Context, scope TargetScope, collectionID int64) string {
-	mgr.rwMutex.RLock()
-	defer mgr.rwMutex.RUnlock()
-
 	ret := mgr.getTarget(scope)
 	if ret == nil {
 		return ""
@@ -656,9 +601,6 @@ func (mgr *TargetManager) GetTargetJSON(ctx context.Context, scope TargetScope,
 }
 
 func (mgr *TargetManager) GetPartitions(ctx context.Context, collectionID int64, scope TargetScope) ([]int64, error) {
-	mgr.rwMutex.RLock()
-	defer mgr.rwMutex.RUnlock()
-
 	ret := mgr.getCollectionTarget(scope, collectionID)
 	if len(ret) == 0 {
 		return nil, merr.WrapErrCollectionNotLoaded(collectionID)
@@ -676,9 +618,7 @@ func (mgr *TargetManager) getTarget(scope TargetScope) *target {
 }
 
 func (mgr *TargetManager) IsCurrentTargetReady(ctx context.Context, collectionID int64) bool {
-	mgr.rwMutex.RLock()
-	defer mgr.rwMutex.RUnlock()
-	target, ok := mgr.current.collectionTargetMap[collectionID]
+	target, ok := mgr.current.collectionTargetMap.Get(collectionID)
 	if !ok {
 		return false
 	}
diff --git a/internal/querycoordv2/meta/target_manager_test.go b/internal/querycoordv2/meta/target_manager_test.go
index a917b7a042..7fec53bf9d 100644
--- a/internal/querycoordv2/meta/target_manager_test.go
+++ b/internal/querycoordv2/meta/target_manager_test.go
@@ -425,33 +425,38 @@ func (suite *TargetManagerSuite) TestGetTarget() {
 	current := &CollectionTarget{}
 	next := &CollectionTarget{}
 
+	t1 := typeutil.NewConcurrentMap[int64, *CollectionTarget]()
+	t2 := typeutil.NewConcurrentMap[int64, *CollectionTarget]()
+	t3 := typeutil.NewConcurrentMap[int64, *CollectionTarget]()
+	t4 := typeutil.NewConcurrentMap[int64, *CollectionTarget]()
+	t1.Insert(1000, current)
+	t2.Insert(1000, next)
+	t3.Insert(1000, current)
+	t4.Insert(1000, current)
+
 	bothMgr := &TargetManager{
 		current: &target{
-			collectionTargetMap: map[int64]*CollectionTarget{
-				1000: current,
-			},
+			collectionTargetMap: t1,
 		},
 		next: &target{
-			collectionTargetMap: map[int64]*CollectionTarget{
-				1000: next,
-			},
+			collectionTargetMap: t2,
 		},
 	}
 	currentMgr := &TargetManager{
 		current: &target{
-			collectionTargetMap: map[int64]*CollectionTarget{
-				1000: current,
-			},
+			collectionTargetMap: t3,
+		},
+		next: &target{
+			collectionTargetMap: typeutil.NewConcurrentMap[int64, *CollectionTarget](),
 		},
-		next: &target{},
 	}
 	nextMgr := &TargetManager{
 		next: &target{
-			collectionTargetMap: map[int64]*CollectionTarget{
-				1000: current,
-			},
+			collectionTargetMap: t4,
+		},
+		current: &target{
+			collectionTargetMap: typeutil.NewConcurrentMap[int64, *CollectionTarget](),
 		},
-		current: &target{},
 	}
 
 	cases := []testCase{
@@ -728,7 +733,7 @@ func BenchmarkTargetManager(b *testing.B) {
 
 	collectionNum := 10000
 	for i := 0; i < collectionNum; i++ {
-		mgr.current.collectionTargetMap[int64(i)] = NewCollectionTarget(segments, channels, nil)
+		mgr.current.collectionTargetMap.Insert(int64(i), NewCollectionTarget(segments, channels, nil))
 	}
 	b.ResetTimer()
 	for i := 0; i < b.N; i++ {
diff --git a/internal/querycoordv2/observers/collection_observer.go b/internal/querycoordv2/observers/collection_observer.go
index 4b86e6c225..10e7e53582 100644
--- a/internal/querycoordv2/observers/collection_observer.go
+++ b/internal/querycoordv2/observers/collection_observer.go
@@ -240,9 +240,13 @@ func (ob *CollectionObserver) readyToObserve(ctx context.Context, collectionID i
 
 func (ob *CollectionObserver) observeLoadStatus(ctx context.Context) {
 	loading := false
+	observeTaskNum := 0
+	observeStart := time.Now()
 	ob.loadTasks.Range(func(traceID string, task LoadTask) bool {
 		loading = true
+		observeTaskNum++
 
+		start := time.Now()
 		collection := ob.meta.CollectionManager.GetCollection(ctx, task.CollectionID)
 		if collection == nil {
 			return true
@@ -296,9 +300,12 @@ func (ob *CollectionObserver) observeLoadStatus(ctx context.Context) {
 			ob.loadTasks.Remove(traceID)
 		}
 
+		log.Info("observe collection done", zap.Int64("collectionID", task.CollectionID), zap.Duration("dur", time.Since(start)))
 		return true
 	})
 
+	log.Info("observe all collections done", zap.Int("num", observeTaskNum), zap.Duration("dur", time.Since(observeStart)))
+
 	// trigger check logic when loading collections/partitions
 	if loading {
 		ob.checkerController.Check()
@@ -325,11 +332,6 @@ func (ob *CollectionObserver) observeChannelStatus(ctx context.Context, collecti
 }
 
 func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, partition *meta.Partition, replicaNum int32, channelTargetNum, subChannelCount int) bool {
-	log := log.Ctx(ctx).WithRateGroup("qcv2.observePartitionLoadStatus", 1, 60).With(
-		zap.Int64("collectionID", partition.GetCollectionID()),
-		zap.Int64("partitionID", partition.GetPartitionID()),
-	)
-
 	segmentTargets := ob.targetMgr.GetSealedSegmentsByPartition(ctx, partition.GetCollectionID(), partition.GetPartitionID(), meta.NextTarget)
 
 	targetNum := len(segmentTargets) + channelTargetNum
@@ -338,7 +340,9 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa
 		return false
 	}
 
-	log.RatedInfo(10, "partition targets",
+	log.Ctx(ctx).WithRateGroup("qcv2.observePartitionLoadStatus", 1, 60).RatedInfo(10, "partition targets",
+		zap.Int64("collectionID", partition.GetCollectionID()),
+		zap.Int64("partitionID", partition.GetPartitionID()),
 		zap.Int("segmentTargetNum", len(segmentTargets)),
 		zap.Int("channelTargetNum", channelTargetNum),
 		zap.Int("totalTargetNum", targetNum),
@@ -355,11 +359,6 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa
 		group := utils.GroupNodesByReplica(ctx, ob.meta.ReplicaManager, partition.GetCollectionID(), nodes)
 		loadedCount += len(group)
 	}
-	if loadedCount > 0 {
-		log.Info("partition load progress",
-			zap.Int("subChannelCount", subChannelCount),
-			zap.Int("loadSegmentCount", loadedCount-subChannelCount))
-	}
 	loadPercentage = int32(loadedCount * 100 / (targetNum * int(replicaNum)))
 
 	if loadedCount <= ob.partitionLoadedCount[partition.GetPartitionID()] && loadPercentage != 100 {
@@ -370,30 +369,37 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa
 	ob.partitionLoadedCount[partition.GetPartitionID()] = loadedCount
 	if loadPercentage == 100 {
 		if !ob.targetObserver.Check(ctx, partition.GetCollectionID(), partition.PartitionID) {
-			log.Warn("failed to manual check current target, skip update load status")
+			log.Ctx(ctx).Warn("failed to manual check current target, skip update load status",
+				zap.Int64("collectionID", partition.GetCollectionID()),
+				zap.Int64("partitionID", partition.GetPartitionID()))
 			return false
 		}
 		delete(ob.partitionLoadedCount, partition.GetPartitionID())
 	}
 	err := ob.meta.CollectionManager.UpdatePartitionLoadPercent(ctx, partition.PartitionID, loadPercentage)
 	if err != nil {
-		log.Warn("failed to update partition load percentage")
+		log.Ctx(ctx).Warn("failed to update partition load percentage",
+			zap.Int64("collectionID", partition.GetCollectionID()),
+			zap.Int64("partitionID", partition.GetPartitionID()))
 	}
-	log.Info("partition load status updated",
+	log.Ctx(ctx).Info("partition load status updated",
+		zap.Int64("collectionID", partition.GetCollectionID()),
+		zap.Int64("partitionID", partition.GetPartitionID()),
 		zap.Int32("partitionLoadPercentage", loadPercentage),
+		zap.Int("subChannelCount", subChannelCount),
+		zap.Int("loadSegmentCount", loadedCount-subChannelCount),
 	)
 	eventlog.Record(eventlog.NewRawEvt(eventlog.Level_Info, fmt.Sprintf("partition %d load percentage update: %d", partition.PartitionID, loadPercentage)))
 	return true
 }
 
 func (ob *CollectionObserver) observeCollectionLoadStatus(ctx context.Context, collectionID int64) {
-	log := log.Ctx(ctx).With(zap.Int64("collectionID", collectionID))
-
 	collectionPercentage, err := ob.meta.CollectionManager.UpdateCollectionLoadPercent(ctx, collectionID)
 	if err != nil {
-		log.Warn("failed to update collection load percentage")
+		log.Ctx(ctx).Warn("failed to update collection load percentage", zap.Int64("collectionID", collectionID))
 	}
-	log.Info("collection load status updated",
+	log.Ctx(ctx).Info("collection load status updated",
+		zap.Int64("collectionID", collectionID),
 		zap.Int32("collectionLoadPercentage", collectionPercentage),
 	)
 	if collectionPercentage == 100 {
diff --git a/internal/querycoordv2/task/action.go b/internal/querycoordv2/task/action.go
index 2c3f69c073..d8499fd491 100644
--- a/internal/querycoordv2/task/action.go
+++ b/internal/querycoordv2/task/action.go
@@ -24,7 +24,6 @@ import (
 
 	"github.com/milvus-io/milvus/internal/querycoordv2/meta"
 	"github.com/milvus-io/milvus/pkg/proto/querypb"
-	"github.com/milvus-io/milvus/pkg/util/funcutil"
 	"github.com/milvus-io/milvus/pkg/util/typeutil"
 )
 
@@ -116,49 +115,7 @@ func (action *SegmentAction) GetScope() querypb.DataScope {
 }
 
 func (action *SegmentAction) IsFinished(distMgr *meta.DistributionManager) bool {
-	if action.Type() == ActionTypeGrow {
-		// rpc finished
-		if !action.rpcReturned.Load() {
-			return false
-		}
-
-		// segment found in leader view
-		views := distMgr.LeaderViewManager.GetByFilter(
-			meta.WithChannelName2LeaderView(action.Shard),
-			meta.WithSegment2LeaderView(action.SegmentID, false))
-		if len(views) == 0 {
-			return false
-		}
-
-		// segment found in dist
-		segmentInTargetNode := distMgr.SegmentDistManager.GetByFilter(meta.WithNodeID(action.Node()), meta.WithSegmentID(action.SegmentID))
-		return len(segmentInTargetNode) > 0
-	} else if action.Type() == ActionTypeReduce {
-		// FIXME: Now shard leader's segment view is a map of segment ID to node ID,
-		// loading segment replaces the node ID with the new one,
-		// which confuses the condition of finishing,
-		// the leader should return a map of segment ID to list of nodes,
-		// now, we just always commit the release task to executor once.
-		// NOTE: DO NOT create a task containing release action and the action is not the last action
-		sealed := distMgr.SegmentDistManager.GetByFilter(meta.WithNodeID(action.Node()))
-		views := distMgr.LeaderViewManager.GetByFilter(meta.WithNodeID2LeaderView(action.Node()))
-		growing := lo.FlatMap(views, func(view *meta.LeaderView, _ int) []int64 {
-			return lo.Keys(view.GrowingSegments)
-		})
-		segments := make([]int64, 0, len(sealed)+len(growing))
-		for _, segment := range sealed {
-			segments = append(segments, segment.GetID())
-		}
-		segments = append(segments, growing...)
-		if !funcutil.SliceContain(segments, action.GetSegmentID()) {
-			return true
-		}
-		return action.rpcReturned.Load()
-	} else if action.Type() == ActionTypeUpdate {
-		return action.rpcReturned.Load()
-	}
-
-	return true
+	return action.rpcReturned.Load()
 }
 
 func (action *SegmentAction) Desc() string {
diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go
index bb2e911398..404a0897bd 100644
--- a/internal/querycoordv2/task/scheduler.go
+++ b/internal/querycoordv2/task/scheduler.go
@@ -38,7 +38,9 @@ import (
 	"github.com/milvus-io/milvus/pkg/proto/querypb"
 	"github.com/milvus-io/milvus/pkg/util/funcutil"
 	"github.com/milvus-io/milvus/pkg/util/hardware"
+	"github.com/milvus-io/milvus/pkg/util/lock"
 	"github.com/milvus-io/milvus/pkg/util/merr"
+	"github.com/milvus-io/milvus/pkg/util/timerecord"
 	. "github.com/milvus-io/milvus/pkg/util/typeutil"
 )
 
@@ -91,6 +93,7 @@ type replicaChannelIndex struct {
 }
 
 type taskQueue struct {
+	mu sync.RWMutex
 	// TaskPriority -> TaskID -> Task
 	buckets []map[int64]Task
 }
@@ -106,6 +109,8 @@ func newTaskQueue() *taskQueue {
 }
 
 func (queue *taskQueue) Len() int {
+	queue.mu.RLock()
+	defer queue.mu.RUnlock()
 	taskNum := 0
 	for _, tasks := range queue.buckets {
 		taskNum += len(tasks)
@@ -115,17 +120,23 @@ func (queue *taskQueue) Len() int {
 }
 
 func (queue *taskQueue) Add(task Task) {
+	queue.mu.Lock()
+	defer queue.mu.Unlock()
 	bucket := queue.buckets[task.Priority()]
 	bucket[task.ID()] = task
 }
 
 func (queue *taskQueue) Remove(task Task) {
+	queue.mu.Lock()
+	defer queue.mu.Unlock()
 	bucket := queue.buckets[task.Priority()]
 	delete(bucket, task.ID())
 }
 
 // Range iterates all tasks in the queue ordered by priority from high to low
 func (queue *taskQueue) Range(fn func(task Task) bool) {
+	queue.mu.RLock()
+	defer queue.mu.RUnlock()
 	for priority := len(queue.buckets) - 1; priority >= 0; priority-- {
 		for _, task := range queue.buckets[priority] {
 			if !fn(task) {
@@ -153,9 +164,8 @@ type Scheduler interface {
 }
 
 type taskScheduler struct {
-	rwmutex     sync.RWMutex
 	ctx         context.Context
-	executors   map[int64]*Executor // NodeID -> Executor
+	executors   *ConcurrentMap[int64, *Executor] // NodeID -> Executor
 	idAllocator func() UniqueID
 
 	distMgr   *meta.DistributionManager
@@ -165,9 +175,11 @@ type taskScheduler struct {
 	cluster   session.Cluster
 	nodeMgr   *session.NodeManager
 
-	tasks        UniqueSet
-	segmentTasks map[replicaSegmentIndex]Task
-	channelTasks map[replicaChannelIndex]Task
+	scheduleMu   sync.Mutex           // guards schedule()
+	collKeyLock  *lock.KeyLock[int64] // guards Add()
+	tasks        *ConcurrentMap[UniqueID, struct{}]
+	segmentTasks *ConcurrentMap[replicaSegmentIndex, Task]
+	channelTasks *ConcurrentMap[replicaChannelIndex, Task]
 	processQueue *taskQueue
 	waitQueue    *taskQueue
 	taskStats    *expirable.LRU[UniqueID, Task]
@@ -184,7 +196,7 @@ func NewScheduler(ctx context.Context,
 	id := time.Now().UnixMilli()
 	return &taskScheduler{
 		ctx:       ctx,
-		executors: make(map[int64]*Executor),
+		executors: NewConcurrentMap[int64, *Executor](),
 		idAllocator: func() UniqueID {
 			id++
 			return id
@@ -197,9 +209,10 @@ func NewScheduler(ctx context.Context,
 		cluster:   cluster,
 		nodeMgr:   nodeMgr,
 
-		tasks:        make(UniqueSet),
-		segmentTasks: make(map[replicaSegmentIndex]Task),
-		channelTasks: make(map[replicaChannelIndex]Task),
+		collKeyLock:  lock.NewKeyLock[int64](),
+		tasks:        NewConcurrentMap[UniqueID, struct{}](),
+		segmentTasks: NewConcurrentMap[replicaSegmentIndex, Task](),
+		channelTasks: NewConcurrentMap[replicaChannelIndex, Task](),
 		processQueue: newTaskQueue(),
 		waitQueue:    newTaskQueue(),
 		taskStats:    expirable.NewLRU[UniqueID, Task](64, nil, time.Minute*15),
@@ -209,30 +222,22 @@ func NewScheduler(ctx context.Context,
 func (scheduler *taskScheduler) Start() {}
 
 func (scheduler *taskScheduler) Stop() {
-	scheduler.rwmutex.Lock()
-	defer scheduler.rwmutex.Unlock()
-
-	for nodeID, executor := range scheduler.executors {
+	scheduler.executors.Range(func(nodeID int64, executor *Executor) bool {
 		executor.Stop()
-		delete(scheduler.executors, nodeID)
-	}
+		return true
+	})
 
-	for _, task := range scheduler.segmentTasks {
+	scheduler.segmentTasks.Range(func(_ replicaSegmentIndex, task Task) bool {
 		scheduler.remove(task)
-	}
-	for _, task := range scheduler.channelTasks {
+		return true
+	})
+	scheduler.channelTasks.Range(func(_ replicaChannelIndex, task Task) bool {
 		scheduler.remove(task)
-	}
+		return true
+	})
 }
 
 func (scheduler *taskScheduler) AddExecutor(nodeID int64) {
-	scheduler.rwmutex.Lock()
-	defer scheduler.rwmutex.Unlock()
-
-	if _, exist := scheduler.executors[nodeID]; exist {
-		return
-	}
-
 	executor := NewExecutor(scheduler.meta,
 		scheduler.distMgr,
 		scheduler.broker,
@@ -240,27 +245,24 @@ func (scheduler *taskScheduler) AddExecutor(nodeID int64) {
 		scheduler.cluster,
 		scheduler.nodeMgr)
 
-	scheduler.executors[nodeID] = executor
+	if _, exist := scheduler.executors.GetOrInsert(nodeID, executor); exist {
+		return
+	}
 	executor.Start(scheduler.ctx)
 	log.Ctx(scheduler.ctx).Info("add executor for new QueryNode", zap.Int64("nodeID", nodeID))
 }
 
 func (scheduler *taskScheduler) RemoveExecutor(nodeID int64) {
-	scheduler.rwmutex.Lock()
-	defer scheduler.rwmutex.Unlock()
-
-	executor, ok := scheduler.executors[nodeID]
+	executor, ok := scheduler.executors.GetAndRemove(nodeID)
 	if ok {
 		executor.Stop()
-		delete(scheduler.executors, nodeID)
 		log.Ctx(scheduler.ctx).Info("remove executor of offline QueryNode", zap.Int64("nodeID", nodeID))
 	}
 }
 
 func (scheduler *taskScheduler) Add(task Task) error {
-	scheduler.rwmutex.Lock()
-	defer scheduler.rwmutex.Unlock()
-
+	scheduler.collKeyLock.Lock(task.CollectionID())
+	defer scheduler.collKeyLock.Unlock(task.CollectionID())
 	err := scheduler.preAdd(task)
 	if err != nil {
 		task.Cancel(err)
@@ -269,19 +271,19 @@ func (scheduler *taskScheduler) Add(task Task) error {
 
 	task.SetID(scheduler.idAllocator())
 	scheduler.waitQueue.Add(task)
-	scheduler.tasks.Insert(task.ID())
+	scheduler.tasks.Insert(task.ID(), struct{}{})
 	switch task := task.(type) {
 	case *SegmentTask:
 		index := NewReplicaSegmentIndex(task)
-		scheduler.segmentTasks[index] = task
+		scheduler.segmentTasks.Insert(index, task)
 
 	case *ChannelTask:
 		index := replicaChannelIndex{task.ReplicaID(), task.Channel()}
-		scheduler.channelTasks[index] = task
+		scheduler.channelTasks.Insert(index, task)
 
 	case *LeaderTask:
 		index := NewReplicaLeaderIndex(task)
-		scheduler.segmentTasks[index] = task
+		scheduler.segmentTasks.Insert(index, task)
 	}
 
 	scheduler.taskStats.Add(task.ID(), task)
@@ -292,21 +294,39 @@ func (scheduler *taskScheduler) Add(task Task) error {
 }
 
 func (scheduler *taskScheduler) updateTaskMetrics() {
-	segmentGrowNum, segmentReduceNum, segmentMoveNum := 0, 0, 0
+	segmentGrowNum, segmentReduceNum, segmentUpdateNum, segmentMoveNum := 0, 0, 0, 0
+	leaderGrowNum, leaderReduceNum, leaderUpdateNum := 0, 0, 0
 	channelGrowNum, channelReduceNum, channelMoveNum := 0, 0, 0
-	for _, task := range scheduler.segmentTasks {
-		taskType := GetTaskType(task)
-		switch taskType {
-		case TaskTypeGrow:
-			segmentGrowNum++
-		case TaskTypeReduce:
-			segmentReduceNum++
-		case TaskTypeMove:
+	scheduler.segmentTasks.Range(func(_ replicaSegmentIndex, task Task) bool {
+		switch {
+		case len(task.Actions()) > 1:
 			segmentMoveNum++
+		case task.Actions()[0].Type() == ActionTypeGrow:
+			if _, ok := task.Actions()[0].(*SegmentAction); ok {
+				segmentGrowNum++
+			}
+			if _, ok := task.Actions()[0].(*LeaderAction); ok {
+				leaderGrowNum++
+			}
+		case task.Actions()[0].Type() == ActionTypeReduce:
+			if _, ok := task.Actions()[0].(*SegmentAction); ok {
+				segmentReduceNum++
+			}
+			if _, ok := task.Actions()[0].(*LeaderAction); ok {
+				leaderReduceNum++
+			}
+		case task.Actions()[0].Type() == ActionTypeUpdate:
+			if _, ok := task.Actions()[0].(*SegmentAction); ok {
+				segmentUpdateNum++
+			}
+			if _, ok := task.Actions()[0].(*LeaderAction); ok {
+				leaderUpdateNum++
+			}
 		}
-	}
+		return true
+	})
 
-	for _, task := range scheduler.channelTasks {
+	scheduler.channelTasks.Range(func(_ replicaChannelIndex, task Task) bool {
 		taskType := GetTaskType(task)
 		switch taskType {
 		case TaskTypeGrow:
@@ -316,11 +336,18 @@ func (scheduler *taskScheduler) updateTaskMetrics() {
 		case TaskTypeMove:
 			channelMoveNum++
 		}
-	}
+		return true
+	})
 
 	metrics.QueryCoordTaskNum.WithLabelValues(metrics.SegmentGrowTaskLabel).Set(float64(segmentGrowNum))
 	metrics.QueryCoordTaskNum.WithLabelValues(metrics.SegmentReduceTaskLabel).Set(float64(segmentReduceNum))
 	metrics.QueryCoordTaskNum.WithLabelValues(metrics.SegmentMoveTaskLabel).Set(float64(segmentMoveNum))
+	metrics.QueryCoordTaskNum.WithLabelValues(metrics.SegmentUpdateTaskLabel).Set(float64(segmentUpdateNum))
+
+	metrics.QueryCoordTaskNum.WithLabelValues(metrics.LeaderGrowTaskLabel).Set(float64(leaderGrowNum))
+	metrics.QueryCoordTaskNum.WithLabelValues(metrics.LeaderReduceTaskLabel).Set(float64(leaderReduceNum))
+	metrics.QueryCoordTaskNum.WithLabelValues(metrics.LeaderUpdateTaskLabel).Set(float64(leaderUpdateNum))
+
 	metrics.QueryCoordTaskNum.WithLabelValues(metrics.ChannelGrowTaskLabel).Set(float64(channelGrowNum))
 	metrics.QueryCoordTaskNum.WithLabelValues(metrics.ChannelReduceTaskLabel).Set(float64(channelReduceNum))
 	metrics.QueryCoordTaskNum.WithLabelValues(metrics.ChannelMoveTaskLabel).Set(float64(channelMoveNum))
@@ -332,7 +359,7 @@ func (scheduler *taskScheduler) preAdd(task Task) error {
 	switch task := task.(type) {
 	case *SegmentTask:
 		index := NewReplicaSegmentIndex(task)
-		if old, ok := scheduler.segmentTasks[index]; ok {
+		if old, ok := scheduler.segmentTasks.Get(index); ok {
 			if task.Priority() > old.Priority() {
 				log.Ctx(scheduler.ctx).Info("replace old task, the new one with higher priority",
 					zap.Int64("oldID", old.ID()),
@@ -365,7 +392,7 @@ func (scheduler *taskScheduler) preAdd(task Task) error {
 
 	case *ChannelTask:
 		index := replicaChannelIndex{task.ReplicaID(), task.Channel()}
-		if old, ok := scheduler.channelTasks[index]; ok {
+		if old, ok := scheduler.channelTasks.Get(index); ok {
 			if task.Priority() > old.Priority() {
 				log.Ctx(scheduler.ctx).Info("replace old task, the new one with higher priority",
 					zap.Int64("oldID", old.ID()),
@@ -398,7 +425,7 @@ func (scheduler *taskScheduler) preAdd(task Task) error {
 		}
 	case *LeaderTask:
 		index := NewReplicaLeaderIndex(task)
-		if old, ok := scheduler.segmentTasks[index]; ok {
+		if old, ok := scheduler.segmentTasks.Get(index); ok {
 			if task.Priority() > old.Priority() {
 				log.Ctx(scheduler.ctx).Info("replace old task, the new one with higher priority",
 					zap.Int64("oldID", old.ID()),
@@ -477,46 +504,42 @@ func (scheduler *taskScheduler) Dispatch(node int64) {
 		log.Ctx(scheduler.ctx).Info("scheduler stopped")
 
 	default:
-		scheduler.rwmutex.Lock()
-		defer scheduler.rwmutex.Unlock()
+		scheduler.scheduleMu.Lock()
+		defer scheduler.scheduleMu.Unlock()
 		scheduler.schedule(node)
 	}
 }
 
 func (scheduler *taskScheduler) GetSegmentTaskDelta(nodeID, collectionID int64) int {
-	scheduler.rwmutex.RLock()
-	defer scheduler.rwmutex.RUnlock()
-
 	targetActions := make(map[int64][]Action)
-	for _, task := range scheduler.segmentTasks { // Map key: replicaSegmentIndex
+	scheduler.segmentTasks.Range(func(_ replicaSegmentIndex, task Task) bool {
 		taskCollID := task.CollectionID()
 		if collectionID != -1 && collectionID != taskCollID {
-			continue
+			return true
 		}
 		actions := filterActions(task.Actions(), nodeID)
 		if len(actions) > 0 {
 			targetActions[taskCollID] = append(targetActions[taskCollID], actions...)
 		}
-	}
+		return true
+	})
 
 	return scheduler.calculateTaskDelta(targetActions)
 }
 
 func (scheduler *taskScheduler) GetChannelTaskDelta(nodeID, collectionID int64) int {
-	scheduler.rwmutex.RLock()
-	defer scheduler.rwmutex.RUnlock()
-
 	targetActions := make(map[int64][]Action)
-	for _, task := range scheduler.channelTasks { // Map key: replicaChannelIndex
+	scheduler.channelTasks.Range(func(_ replicaChannelIndex, task Task) bool {
 		taskCollID := task.CollectionID()
 		if collectionID != -1 && collectionID != taskCollID {
-			continue
+			return true
 		}
 		actions := filterActions(task.Actions(), nodeID)
 		if len(actions) > 0 {
 			targetActions[taskCollID] = append(targetActions[taskCollID], actions...)
 		}
-	}
+		return true
+	})
 
 	return scheduler.calculateTaskDelta(targetActions)
 }
@@ -561,10 +584,7 @@ func (scheduler *taskScheduler) calculateTaskDelta(targetActions map[int64][]Act
 }
 
 func (scheduler *taskScheduler) GetExecutedFlag(nodeID int64) <-chan struct{} {
-	scheduler.rwmutex.RLock()
-	defer scheduler.rwmutex.RUnlock()
-
-	executor, ok := scheduler.executors[nodeID]
+	executor, ok := scheduler.executors.Get(nodeID)
 	if !ok {
 		return nil
 	}
@@ -587,16 +607,13 @@ func WithTaskTypeFilter(taskType Type) TaskFilter {
 }
 
 func (scheduler *taskScheduler) GetChannelTaskNum(filters ...TaskFilter) int {
-	scheduler.rwmutex.RLock()
-	defer scheduler.rwmutex.RUnlock()
-
 	if len(filters) == 0 {
-		return len(scheduler.channelTasks)
+		return scheduler.channelTasks.Len()
 	}
 
 	// rewrite this with for loop
 	counter := 0
-	for _, task := range scheduler.channelTasks {
+	scheduler.channelTasks.Range(func(_ replicaChannelIndex, task Task) bool {
 		allMatch := true
 		for _, filter := range filters {
 			if !filter(task) {
@@ -607,21 +624,19 @@ func (scheduler *taskScheduler) GetChannelTaskNum(filters ...TaskFilter) int {
 		if allMatch {
 			counter++
 		}
-	}
+		return true
+	})
 	return counter
 }
 
 func (scheduler *taskScheduler) GetSegmentTaskNum(filters ...TaskFilter) int {
-	scheduler.rwmutex.RLock()
-	defer scheduler.rwmutex.RUnlock()
-
 	if len(filters) == 0 {
-		return len(scheduler.segmentTasks)
+		scheduler.segmentTasks.Len()
 	}
 
 	// rewrite this with for loop
 	counter := 0
-	for _, task := range scheduler.segmentTasks {
+	scheduler.segmentTasks.Range(func(_ replicaSegmentIndex, task Task) bool {
 		allMatch := true
 		for _, filter := range filters {
 			if !filter(task) {
@@ -632,7 +647,8 @@ func (scheduler *taskScheduler) GetSegmentTaskNum(filters ...TaskFilter) int {
 		if allMatch {
 			counter++
 		}
-	}
+		return true
+	})
 	return counter
 }
 
@@ -657,17 +673,19 @@ func (scheduler *taskScheduler) schedule(node int64) {
 		return
 	}
 
+	tr := timerecord.NewTimeRecorder("")
 	log := log.Ctx(scheduler.ctx).With(
 		zap.Int64("nodeID", node),
 	)
 
 	scheduler.tryPromoteAll()
+	promoteDur := tr.RecordSpan()
 
 	log.Debug("process tasks related to node",
 		zap.Int("processingTaskNum", scheduler.processQueue.Len()),
 		zap.Int("waitingTaskNum", scheduler.waitQueue.Len()),
-		zap.Int("segmentTaskNum", len(scheduler.segmentTasks)),
-		zap.Int("channelTaskNum", len(scheduler.channelTasks)),
+		zap.Int("segmentTaskNum", scheduler.segmentTasks.Len()),
+		zap.Int("channelTaskNum", scheduler.channelTasks.Len()),
 	)
 
 	// Process tasks
@@ -683,6 +701,7 @@ func (scheduler *taskScheduler) schedule(node int64) {
 
 		return true
 	})
+	preprocessDur := tr.RecordSpan()
 
 	// The scheduler doesn't limit the number of tasks,
 	// to commit tasks to executors as soon as possible, to reach higher merge possibility
@@ -693,22 +712,29 @@ func (scheduler *taskScheduler) schedule(node int64) {
 		}
 		return nil
 	}, "process")
+	processDur := tr.RecordSpan()
 
 	for _, task := range toRemove {
 		scheduler.remove(task)
 	}
 
+	scheduler.updateTaskMetrics()
+
 	log.Info("processed tasks",
 		zap.Int("toProcessNum", len(toProcess)),
 		zap.Int32("committedNum", commmittedNum.Load()),
 		zap.Int("toRemoveNum", len(toRemove)),
+		zap.Duration("promoteDur", promoteDur),
+		zap.Duration("preprocessDUr", preprocessDur),
+		zap.Duration("processDUr", processDur),
+		zap.Duration("totalDur", tr.ElapseSpan()),
 	)
 
 	log.Info("process tasks related to node done",
 		zap.Int("processingTaskNum", scheduler.processQueue.Len()),
 		zap.Int("waitingTaskNum", scheduler.waitQueue.Len()),
-		zap.Int("segmentTaskNum", len(scheduler.segmentTasks)),
-		zap.Int("channelTaskNum", len(scheduler.channelTasks)),
+		zap.Int("segmentTaskNum", scheduler.segmentTasks.Len()),
+		zap.Int("channelTaskNum", scheduler.channelTasks.Len()),
 	)
 }
 
@@ -749,10 +775,6 @@ func (scheduler *taskScheduler) isRelated(task Task, node int64) bool {
 // return true if the task should be executed,
 // false otherwise
 func (scheduler *taskScheduler) preProcess(task Task) bool {
-	log := log.Ctx(scheduler.ctx).WithRateGroup("qcv2.taskScheduler", 1, 60).With(
-		zap.Int64("collectionID", task.CollectionID()),
-		zap.Int64("taskID", task.ID()),
-	)
 	if task.Status() != TaskStatusStarted {
 		return false
 	}
@@ -775,7 +797,9 @@ func (scheduler *taskScheduler) preProcess(task Task) bool {
 			}
 
 			if !ready {
-				log.RatedInfo(30, "Blocking reduce action in balance channel task")
+				log.Ctx(scheduler.ctx).WithRateGroup("qcv2.taskScheduler", 1, 60).RatedInfo(30, "Blocking reduce action in balance channel task",
+					zap.Int64("collectionID", task.CollectionID()),
+					zap.Int64("taskID", task.ID()))
 				break
 			}
 		}
@@ -806,7 +830,7 @@ func (scheduler *taskScheduler) process(task Task) bool {
 	)
 
 	actions, step := task.Actions(), task.Step()
-	executor, ok := scheduler.executors[actions[step].Node()]
+	executor, ok := scheduler.executors.Get(actions[step].Node())
 	if !ok {
 		log.Warn("no executor for QueryNode",
 			zap.Int("step", step),
@@ -827,19 +851,18 @@ func (scheduler *taskScheduler) check(task Task) error {
 }
 
 func (scheduler *taskScheduler) RemoveByNode(node int64) {
-	scheduler.rwmutex.Lock()
-	defer scheduler.rwmutex.Unlock()
-
-	for _, task := range scheduler.segmentTasks {
+	scheduler.segmentTasks.Range(func(_ replicaSegmentIndex, task Task) bool {
 		if scheduler.isRelated(task, node) {
 			scheduler.remove(task)
 		}
-	}
-	for _, task := range scheduler.channelTasks {
+		return true
+	})
+	scheduler.channelTasks.Range(func(_ replicaChannelIndex, task Task) bool {
 		if scheduler.isRelated(task, node) {
 			scheduler.remove(task)
 		}
-	}
+		return true
+	})
 }
 
 func (scheduler *taskScheduler) recordSegmentTaskError(task *SegmentTask) {
@@ -875,7 +898,7 @@ func (scheduler *taskScheduler) remove(task Task) {
 	switch task := task.(type) {
 	case *SegmentTask:
 		index := NewReplicaSegmentIndex(task)
-		delete(scheduler.segmentTasks, index)
+		scheduler.segmentTasks.Remove(index)
 		log = log.With(zap.Int64("segmentID", task.SegmentID()))
 		if task.Status() == TaskStatusFailed &&
 			task.Err() != nil &&
@@ -885,16 +908,15 @@ func (scheduler *taskScheduler) remove(task Task) {
 
 	case *ChannelTask:
 		index := replicaChannelIndex{task.ReplicaID(), task.Channel()}
-		delete(scheduler.channelTasks, index)
+		scheduler.channelTasks.Remove(index)
 		log = log.With(zap.String("channel", task.Channel()))
 
 	case *LeaderTask:
 		index := NewReplicaLeaderIndex(task)
-		delete(scheduler.segmentTasks, index)
+		scheduler.segmentTasks.Remove(index)
 		log = log.With(zap.Int64("segmentID", task.SegmentID()))
 	}
 
-	scheduler.updateTaskMetrics()
 	log.Info("task removed")
 
 	if scheduler.meta.Exist(task.Context(), task.CollectionID()) {
@@ -940,14 +962,18 @@ func (scheduler *taskScheduler) getTaskMetricsLabel(task Task) string {
 	return metrics.UnknownTaskLabel
 }
 
-func (scheduler *taskScheduler) checkStale(task Task) error {
-	log := log.Ctx(task.Context()).With(
+func WrapTaskLog(task Task, fields ...zap.Field) []zap.Field {
+	res := []zap.Field{
 		zap.Int64("taskID", task.ID()),
 		zap.Int64("collectionID", task.CollectionID()),
 		zap.Int64("replicaID", task.ReplicaID()),
 		zap.String("source", task.Source().String()),
-	)
+	}
+	res = append(res, fields...)
+	return res
+}
 
+func (scheduler *taskScheduler) checkStale(task Task) error {
 	switch task := task.(type) {
 	case *SegmentTask:
 		if err := scheduler.checkSegmentTaskStale(task); err != nil {
@@ -974,7 +1000,9 @@ func (scheduler *taskScheduler) checkStale(task Task) error {
 			zap.Int("step", step))
 
 		if scheduler.nodeMgr.Get(action.Node()) == nil {
-			log.Warn("the task is stale, the target node is offline")
+			log.Warn("the task is stale, the target node is offline", WrapTaskLog(task,
+				zap.Int64("nodeID", action.Node()),
+				zap.Int("step", step))...)
 			return merr.WrapErrNodeNotFound(action.Node())
 		}
 	}
@@ -983,38 +1011,30 @@ func (scheduler *taskScheduler) checkStale(task Task) error {
 }
 
 func (scheduler *taskScheduler) checkSegmentTaskStale(task *SegmentTask) error {
-	log := log.Ctx(task.Context()).With(
-		zap.Int64("taskID", task.ID()),
-		zap.Int64("collectionID", task.CollectionID()),
-		zap.Int64("replicaID", task.ReplicaID()),
-		zap.String("source", task.Source().String()),
-	)
-
 	for _, action := range task.Actions() {
 		switch action.Type() {
 		case ActionTypeGrow:
 			if ok, _ := scheduler.nodeMgr.IsStoppingNode(action.Node()); ok {
-				log.Warn("task stale due to node offline", zap.Int64("segment", task.segmentID))
+				log.Ctx(task.Context()).Warn("task stale due to node offline", WrapTaskLog(task, zap.Int64("segment", task.segmentID))...)
 				return merr.WrapErrNodeOffline(action.Node())
 			}
 			taskType := GetTaskType(task)
 			segment := scheduler.targetMgr.GetSealedSegment(task.ctx, task.CollectionID(), task.SegmentID(), meta.CurrentTargetFirst)
 			if segment == nil {
-				log.Warn("task stale due to the segment to load not exists in targets",
-					zap.Int64("segment", task.segmentID),
-					zap.String("taskType", taskType.String()),
-				)
+				log.Ctx(task.Context()).Warn("task stale due to the segment to load not exists in targets",
+					WrapTaskLog(task, zap.Int64("segment", task.segmentID),
+						zap.String("taskType", taskType.String()))...)
 				return merr.WrapErrSegmentReduplicate(task.SegmentID(), "target doesn't contain this segment")
 			}
 
 			replica := scheduler.meta.ReplicaManager.GetByCollectionAndNode(task.ctx, task.CollectionID(), action.Node())
 			if replica == nil {
-				log.Warn("task stale due to replica not found")
+				log.Ctx(task.Context()).Warn("task stale due to replica not found", WrapTaskLog(task)...)
 				return merr.WrapErrReplicaNotFound(task.CollectionID(), "by collectionID")
 			}
 			_, ok := scheduler.distMgr.GetShardLeader(replica, segment.GetInsertChannel())
 			if !ok {
-				log.Warn("task stale due to leader not found")
+				log.Ctx(task.Context()).Warn("task stale due to leader not found", WrapTaskLog(task)...)
 				return merr.WrapErrChannelNotFound(segment.GetInsertChannel(), "failed to get shard delegator")
 			}
 
@@ -1026,23 +1046,16 @@ func (scheduler *taskScheduler) checkSegmentTaskStale(task *SegmentTask) error {
 }
 
 func (scheduler *taskScheduler) checkChannelTaskStale(task *ChannelTask) error {
-	log := log.Ctx(task.Context()).With(
-		zap.Int64("taskID", task.ID()),
-		zap.Int64("collectionID", task.CollectionID()),
-		zap.Int64("replicaID", task.ReplicaID()),
-		zap.String("source", task.Source().String()),
-	)
-
 	for _, action := range task.Actions() {
 		switch action.Type() {
 		case ActionTypeGrow:
 			if ok, _ := scheduler.nodeMgr.IsStoppingNode(action.Node()); ok {
-				log.Warn("task stale due to node offline", zap.String("channel", task.Channel()))
+				log.Ctx(task.Context()).Warn("task stale due to node offline", WrapTaskLog(task, zap.String("channel", task.Channel()))...)
 				return merr.WrapErrNodeOffline(action.Node())
 			}
 			if scheduler.targetMgr.GetDmChannel(task.ctx, task.collectionID, task.Channel(), meta.NextTargetFirst) == nil {
-				log.Warn("the task is stale, the channel to subscribe not exists in targets",
-					zap.String("channel", task.Channel()))
+				log.Ctx(task.Context()).Warn("the task is stale, the channel to subscribe not exists in targets",
+					WrapTaskLog(task, zap.String("channel", task.Channel()))...)
 				return merr.WrapErrChannelReduplicate(task.Channel(), "target doesn't contain this channel")
 			}
 
@@ -1054,48 +1067,41 @@ func (scheduler *taskScheduler) checkChannelTaskStale(task *ChannelTask) error {
 }
 
 func (scheduler *taskScheduler) checkLeaderTaskStale(task *LeaderTask) error {
-	log := log.Ctx(task.Context()).With(
-		zap.Int64("taskID", task.ID()),
-		zap.Int64("collectionID", task.CollectionID()),
-		zap.Int64("replicaID", task.ReplicaID()),
-		zap.String("source", task.Source().String()),
-		zap.Int64("leaderID", task.leaderID),
-	)
-
 	for _, action := range task.Actions() {
 		switch action.Type() {
 		case ActionTypeGrow:
 			if ok, _ := scheduler.nodeMgr.IsStoppingNode(action.(*LeaderAction).GetLeaderID()); ok {
-				log.Warn("task stale due to node offline", zap.Int64("segment", task.segmentID))
+				log.Ctx(task.Context()).Warn("task stale due to node offline",
+					WrapTaskLog(task, zap.Int64("leaderID", task.leaderID), zap.Int64("segment", task.segmentID))...)
 				return merr.WrapErrNodeOffline(action.Node())
 			}
 
 			taskType := GetTaskType(task)
 			segment := scheduler.targetMgr.GetSealedSegment(task.ctx, task.CollectionID(), task.SegmentID(), meta.CurrentTargetFirst)
 			if segment == nil {
-				log.Warn("task stale due to the segment to load not exists in targets",
-					zap.Int64("segment", task.segmentID),
-					zap.String("taskType", taskType.String()),
-				)
+				log.Ctx(task.Context()).Warn("task stale due to the segment to load not exists in targets",
+					WrapTaskLog(task, zap.Int64("leaderID", task.leaderID),
+						zap.Int64("segment", task.segmentID),
+						zap.String("taskType", taskType.String()))...)
 				return merr.WrapErrSegmentReduplicate(task.SegmentID(), "target doesn't contain this segment")
 			}
 
 			replica := scheduler.meta.ReplicaManager.GetByCollectionAndNode(task.ctx, task.CollectionID(), action.Node())
 			if replica == nil {
-				log.Warn("task stale due to replica not found")
+				log.Ctx(task.Context()).Warn("task stale due to replica not found", WrapTaskLog(task, zap.Int64("leaderID", task.leaderID))...)
 				return merr.WrapErrReplicaNotFound(task.CollectionID(), "by collectionID")
 			}
 
 			view := scheduler.distMgr.GetLeaderShardView(task.leaderID, task.Shard())
 			if view == nil {
-				log.Warn("task stale due to leader not found")
+				log.Ctx(task.Context()).Warn("task stale due to leader not found", WrapTaskLog(task, zap.Int64("leaderID", task.leaderID))...)
 				return merr.WrapErrChannelNotFound(task.Shard(), "failed to get shard delegator")
 			}
 
 		case ActionTypeReduce:
 			view := scheduler.distMgr.GetLeaderShardView(task.leaderID, task.Shard())
 			if view == nil {
-				log.Warn("task stale due to leader not found")
+				log.Ctx(task.Context()).Warn("task stale due to leader not found", WrapTaskLog(task, zap.Int64("leaderID", task.leaderID))...)
 				return merr.WrapErrChannelNotFound(task.Shard(), "failed to get shard delegator")
 			}
 		}
diff --git a/internal/querycoordv2/task/task.go b/internal/querycoordv2/task/task.go
index 25e64d5be1..c5fa610c20 100644
--- a/internal/querycoordv2/task/task.go
+++ b/internal/querycoordv2/task/task.go
@@ -134,12 +134,14 @@ type baseTask struct {
 	name string
 
 	// startTs
-	startTs time.Time
+	startTs atomic.Time
 }
 
 func newBaseTask(ctx context.Context, source Source, collectionID typeutil.UniqueID, replica *meta.Replica, shard string, taskTag string) *baseTask {
 	ctx, cancel := context.WithCancel(ctx)
 	ctx, span := otel.Tracer(typeutil.QueryCoordRole).Start(ctx, taskTag)
+	startTs := atomic.Time{}
+	startTs.Store(time.Now())
 
 	return &baseTask{
 		source:       source,
@@ -154,7 +156,7 @@ func newBaseTask(ctx context.Context, source Source, collectionID typeutil.Uniqu
 		doneCh:   make(chan struct{}),
 		canceled: atomic.NewBool(false),
 		span:     span,
-		startTs:  time.Now(),
+		startTs:  startTs,
 	}
 }
 
@@ -217,11 +219,11 @@ func (task *baseTask) Index() string {
 }
 
 func (task *baseTask) RecordStartTs() {
-	task.startTs = time.Now()
+	task.startTs.Store(time.Now())
 }
 
 func (task *baseTask) GetTaskLatency() int64 {
-	return time.Since(task.startTs).Milliseconds()
+	return time.Since(task.startTs.Load()).Milliseconds()
 }
 
 func (task *baseTask) Err() error {
diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go
index 9b3b736922..c6c933dfb3 100644
--- a/internal/querycoordv2/task/task_test.go
+++ b/internal/querycoordv2/task/task_test.go
@@ -771,26 +771,14 @@ func (suite *TaskSuite) TestReleaseGrowingSegmentTask() {
 		suite.NoError(err)
 	}
 
-	growings := map[int64]*meta.Segment{}
-	for _, segment := range suite.releaseSegments[1:] {
-		growings[segment] = utils.CreateTestSegment(suite.collection, 1, segment, targetNode, 1, "")
-	}
-	suite.dist.LeaderViewManager.Update(targetNode, &meta.LeaderView{
-		ID:              targetNode,
-		GrowingSegments: growings,
-	})
-
 	segmentsNum := len(suite.releaseSegments)
 	suite.AssertTaskNum(0, segmentsNum, 0, segmentsNum)
 
-	// Process tasks
+	// Process tasks and Release done
 	suite.dispatchAndWait(targetNode)
-	suite.AssertTaskNum(segmentsNum-1, 0, 0, segmentsNum-1)
+	suite.AssertTaskNum(segmentsNum, 0, 0, segmentsNum)
 
-	// Release done
-	suite.dist.LeaderViewManager.Update(targetNode)
-
-	// Process tasks done
+	// Tasks removed
 	suite.dispatchAndWait(targetNode)
 	suite.AssertTaskNum(0, 0, 0, 0)
 
@@ -1090,7 +1078,7 @@ func (suite *TaskSuite) TestSegmentTaskStale() {
 			CollectionID: suite.collection,
 		},
 	}, nil)
-	for _, segment := range suite.loadSegments {
+	for _, segment := range suite.loadSegments[1:] {
 		suite.broker.EXPECT().GetSegmentInfo(mock.Anything, segment).Return([]*datapb.SegmentInfo{
 			{
 				ID:            segment,
@@ -1111,13 +1099,7 @@ func (suite *TaskSuite) TestSegmentTaskStale() {
 	}))
 	suite.dist.LeaderViewManager.Update(targetNode, utils.CreateTestLeaderView(targetNode, suite.collection, channel.ChannelName, map[int64]int64{}, map[int64]*meta.Segment{}))
 	tasks := []Task{}
-	segments := make([]*datapb.SegmentInfo, 0)
 	for _, segment := range suite.loadSegments {
-		segments = append(segments, &datapb.SegmentInfo{
-			ID:            segment,
-			PartitionID:   1,
-			InsertChannel: channel.GetChannelName(),
-		})
 		task, err := NewSegmentTask(
 			ctx,
 			timeout,
@@ -1131,33 +1113,8 @@ func (suite *TaskSuite) TestSegmentTaskStale() {
 		err = suite.scheduler.Add(task)
 		suite.NoError(err)
 	}
-	suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return([]*datapb.VchannelInfo{channel}, segments, nil)
-	suite.meta.CollectionManager.PutPartition(ctx, utils.CreateTestPartition(suite.collection, partition))
-	suite.target.UpdateCollectionNextTarget(ctx, suite.collection)
-	segmentsNum := len(suite.loadSegments)
-	suite.AssertTaskNum(0, segmentsNum, 0, segmentsNum)
 
-	// Process tasks
-	suite.dispatchAndWait(targetNode)
-	suite.AssertTaskNum(segmentsNum, 0, 0, segmentsNum)
-
-	// Process tasks done
-	// Dist contains channels, first task stale
-	view := &meta.LeaderView{
-		ID:           targetNode,
-		CollectionID: suite.collection,
-		Segments:     map[int64]*querypb.SegmentDist{},
-		Channel:      channel.ChannelName,
-	}
-	for _, segment := range suite.loadSegments[1:] {
-		view.Segments[segment] = &querypb.SegmentDist{NodeID: targetNode, Version: 0}
-	}
-	distSegments := lo.Map(segments, func(info *datapb.SegmentInfo, _ int) *meta.Segment {
-		return meta.SegmentFromInfo(info)
-	})
-	suite.dist.LeaderViewManager.Update(targetNode, view)
-	suite.dist.SegmentDistManager.Update(targetNode, distSegments...)
-	segments = make([]*datapb.SegmentInfo, 0)
+	segments := make([]*datapb.SegmentInfo, 0)
 	for _, segment := range suite.loadSegments[1:] {
 		segments = append(segments, &datapb.SegmentInfo{
 			ID:            segment,
@@ -1165,13 +1122,16 @@ func (suite *TaskSuite) TestSegmentTaskStale() {
 			InsertChannel: channel.GetChannelName(),
 		})
 	}
-	bakExpectations := suite.broker.ExpectedCalls
-	suite.broker.AssertExpectations(suite.T())
-	suite.broker.ExpectedCalls = suite.broker.ExpectedCalls[:0]
 	suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return([]*datapb.VchannelInfo{channel}, segments, nil)
 
 	suite.meta.CollectionManager.PutPartition(ctx, utils.CreateTestPartition(suite.collection, 2))
 	suite.target.UpdateCollectionNextTarget(ctx, suite.collection)
+
+	// process done
+	suite.dispatchAndWait(targetNode)
+	suite.AssertTaskNum(1, 0, 0, 1)
+
+	// task removed
 	suite.dispatchAndWait(targetNode)
 	suite.AssertTaskNum(0, 0, 0, 0)
 
@@ -1184,7 +1144,6 @@ func (suite *TaskSuite) TestSegmentTaskStale() {
 			suite.NoError(task.Err())
 		}
 	}
-	suite.broker.ExpectedCalls = bakExpectations
 }
 
 func (suite *TaskSuite) TestChannelTaskReplace() {
@@ -1497,10 +1456,10 @@ func (suite *TaskSuite) AssertTaskNum(process, wait, channel, segment int) {
 
 	suite.Equal(process, scheduler.processQueue.Len())
 	suite.Equal(wait, scheduler.waitQueue.Len())
-	suite.Len(scheduler.segmentTasks, segment)
-	suite.Len(scheduler.channelTasks, channel)
-	suite.Equal(len(scheduler.tasks), process+wait)
-	suite.Equal(len(scheduler.tasks), segment+channel)
+	suite.Equal(scheduler.segmentTasks.Len(), segment)
+	suite.Equal(scheduler.channelTasks.Len(), channel)
+	suite.Equal(scheduler.tasks.Len(), process+wait)
+	suite.Equal(scheduler.tasks.Len(), segment+channel)
 }
 
 func (suite *TaskSuite) dispatchAndWait(node int64) {
@@ -1512,13 +1471,14 @@ func (suite *TaskSuite) dispatchAndWait(node int64) {
 		count = 0
 		keys = make([]any, 0)
 
-		for _, executor := range suite.scheduler.executors {
+		suite.scheduler.executors.Range(func(_ int64, executor *Executor) bool {
 			executor.executingTasks.Range(func(taskIndex string) bool {
 				keys = append(keys, taskIndex)
 				count++
 				return true
 			})
-		}
+			return true
+		})
 
 		if count == 0 {
 			return
diff --git a/internal/querycoordv2/utils/util.go b/internal/querycoordv2/utils/util.go
index 796fddb8ff..8d13056c41 100644
--- a/internal/querycoordv2/utils/util.go
+++ b/internal/querycoordv2/utils/util.go
@@ -48,7 +48,7 @@ func CheckNodeAvailable(nodeID int64, info *session.NodeInfo) error {
 func CheckDelegatorDataReady(nodeMgr *session.NodeManager, targetMgr meta.TargetManagerInterface, leader *meta.LeaderView, scope int32) error {
 	log := log.Ctx(context.TODO()).
 		WithRateGroup(fmt.Sprintf("util.CheckDelegatorDataReady-%d", leader.CollectionID), 1, 60).
-		With(zap.Int64("leaderID", leader.ID))
+		With(zap.Int64("leaderID", leader.ID), zap.Int64("collectionID", leader.CollectionID))
 	info := nodeMgr.Get(leader.ID)
 
 	// Check whether leader is online
diff --git a/pkg/metrics/querycoord_metrics.go b/pkg/metrics/querycoord_metrics.go
index 67aa181421..d66abea2cd 100644
--- a/pkg/metrics/querycoord_metrics.go
+++ b/pkg/metrics/querycoord_metrics.go
@@ -36,6 +36,7 @@ const (
 
 	LeaderGrowTaskLabel   = "leader_grow"
 	LeaderReduceTaskLabel = "leader_reduce"
+	LeaderUpdateTaskLabel = "leader_update"
 
 	UnknownTaskLabel = "unknown"