fix: [2.4] Remove loopclosure issue in ChannelManagerImplV2 (#33989) (#34004)

Cherry-pick from master
pr: #33989
See also #33987

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/34019/head
congqixia 2024-06-20 14:42:00 +08:00 committed by GitHub
parent a7ae45c91c
commit 4424c9e5e7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 4 additions and 2 deletions

View File

@ -529,6 +529,7 @@ func (m *ChannelManagerImplV2) advanceToNotifies(ctx context.Context, toNotifies
if channelCount == 0 {
continue
}
nodeID := nodeAssign.NodeID
var (
succeededChannels = make([]RWChannel, 0, channelCount)
@ -548,7 +549,7 @@ func (m *ChannelManagerImplV2) advanceToNotifies(ctx context.Context, toNotifies
tmpWatchInfo.Vchan = m.h.GetDataVChanPositions(innerCh, allPartitionID)
future := getOrCreateIOPool().Submit(func() (any, error) {
err := m.Notify(ctx, nodeAssign.NodeID, tmpWatchInfo)
err := m.Notify(ctx, nodeID, tmpWatchInfo)
return innerCh, err
})
futures = append(futures, future)
@ -591,6 +592,7 @@ func (m *ChannelManagerImplV2) advanceToChecks(ctx context.Context, toChecks []*
continue
}
nodeID := nodeAssign.NodeID
futures := make([]*conc.Future[any], 0, len(nodeAssign.Channels))
chNames := lo.Keys(nodeAssign.Channels)
@ -603,7 +605,7 @@ func (m *ChannelManagerImplV2) advanceToChecks(ctx context.Context, toChecks []*
innerCh := ch
future := getOrCreateIOPool().Submit(func() (any, error) {
successful, got := m.Check(ctx, nodeAssign.NodeID, innerCh.GetWatchInfo())
successful, got := m.Check(ctx, nodeID, innerCh.GetWatchInfo())
if got {
return poolResult{
successful: successful,