fix: Fix panic due to failed to seek (#34229)

Converting the same msgposition's vchannel to a pchannel multiple times
would result in an invalid pchannel, leading to seek failure and panic.
This PR:
1. Make a copy of msgposition in msgdispatcher.
2. Check if channel is already a pchannel, no further channel conversion
is performed.

issue: https://github.com/milvus-io/milvus/issues/34221

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/34142/head
yihao.dai 2024-07-01 16:08:12 +08:00 committed by GitHub
parent b49862d4f3
commit ff51c7e628
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 26 additions and 9 deletions

View File

@ -96,6 +96,7 @@ func NewDispatcher(ctx context.Context,
return nil, err
}
if position != nil && len(position.MsgID) != 0 {
position = typeutil.Clone(position)
position.ChannelName = funcutil.ToPhysicalChannel(position.ChannelName)
err = stream.AsConsumer(ctx, []string{pchannel}, subName, common.SubscriptionPositionUnknown)
if err != nil {
@ -234,7 +235,7 @@ func (d *Dispatcher) work() {
}
}
if err != nil {
t.pos = pack.StartPositions[0]
t.pos = typeutil.Clone(pack.StartPositions[0])
// replace the pChannel with vChannel
t.pos.ChannelName = t.vchannel
d.lagTargets.Insert(t.vchannel, t)

View File

@ -46,14 +46,16 @@ func TestManager(t *testing.T) {
r := rand.Intn(10) + 1
for j := 0; j < r; j++ {
offset++
t.Logf("dyh add, %s", fmt.Sprintf("mock-pchannel-0_vchannel_%d", offset))
_, err := c.Add(context.Background(), fmt.Sprintf("mock-pchannel-0_vchannel_%d", offset), nil, common.SubscriptionPositionUnknown)
vchannel := fmt.Sprintf("mock-pchannel-dml_0_vchannelv%d", offset)
t.Logf("add vchannel, %s", vchannel)
_, err := c.Add(context.Background(), vchannel, nil, common.SubscriptionPositionUnknown)
assert.NoError(t, err)
assert.Equal(t, offset, c.Num())
}
for j := 0; j < rand.Intn(r); j++ {
t.Logf("dyh remove, %s", fmt.Sprintf("mock-pchannel-0_vchannel_%d", offset))
c.Remove(fmt.Sprintf("mock-pchannel-0_vchannel_%d", offset))
vchannel := fmt.Sprintf("mock-pchannel-dml_0_vchannelv%d", offset)
t.Logf("remove vchannel, %s", vchannel)
c.Remove(vchannel)
offset--
assert.Equal(t, offset, c.Num())
}
@ -166,7 +168,7 @@ func (suite *SimulationSuite) SetupSuite() {
}
func (suite *SimulationSuite) SetupTest() {
suite.pchannel = fmt.Sprintf("by-dev-rootcoord-dispatcher-simulation-dml-%d-%d", rand.Int(), time.Now().UnixNano())
suite.pchannel = fmt.Sprintf("by-dev-rootcoord-dispatcher-simulation-dml_%d", time.Now().UnixNano())
producer, err := newMockProducer(suite.factory, suite.pchannel)
assert.NoError(suite.T(), err)
suite.producer = producer

View File

@ -209,8 +209,16 @@ func GetAvailablePort() int {
return listener.Addr().(*net.TCPAddr).Port
}
// IsPhysicalChannel checks if the channel is a physical channel
func IsPhysicalChannel(channel string) bool {
return strings.Count(channel, "_") == 1
}
// ToPhysicalChannel get physical channel name from virtual channel name
func ToPhysicalChannel(vchannel string) string {
if IsPhysicalChannel(vchannel) {
return vchannel
}
index := strings.LastIndex(vchannel, "_")
if index < 0 {
return vchannel

View File

@ -174,11 +174,17 @@ func TestCheckPortAvailable(t *testing.T) {
}
func Test_ToPhysicalChannel(t *testing.T) {
assert.Equal(t, "abc", ToPhysicalChannel("abc_"))
assert.Equal(t, "abc", ToPhysicalChannel("abc_123"))
assert.Equal(t, "abc", ToPhysicalChannel("abc_defgsg"))
assert.Equal(t, "abc_", ToPhysicalChannel("abc_"))
assert.Equal(t, "abc_123", ToPhysicalChannel("abc_123"))
assert.Equal(t, "abc_defgsg", ToPhysicalChannel("abc_defgsg"))
assert.Equal(t, "abc_123", ToPhysicalChannel("abc_123_456"))
assert.Equal(t, "abc__", ToPhysicalChannel("abc___defgsg"))
assert.Equal(t, "abcdef", ToPhysicalChannel("abcdef"))
channel := "by-dev-rootcoord-dml_3_449883080965365748v0"
for i := 0; i < 10; i++ {
channel = ToPhysicalChannel(channel)
assert.Equal(t, "by-dev-rootcoord-dml_3", channel)
}
}
func Test_ConvertChannelName(t *testing.T) {