mirror of https://github.com/milvus-io/milvus.git
When the main dispatcher has not yet consumed data, curTs is 0. During this time, merging dispatchers should not be allowed; otherwise, the data of the solo dispatcher will be skipped. issue: https://github.com/milvus-io/milvus/issues/34255 pr: https://github.com/milvus-io/milvus/pull/34562 --------- Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/34592/head
parent
15adb2feac
commit
cbba5481f8
|
@ -182,7 +182,7 @@ func (c *dispatcherManager) tryMerge() {
|
|||
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
if c.mainDispatcher == nil {
|
||||
if c.mainDispatcher == nil || c.mainDispatcher.CurTs() == 0 {
|
||||
return
|
||||
}
|
||||
candidates := make(map[string]struct{})
|
||||
|
|
|
@ -74,6 +74,12 @@ func TestManager(t *testing.T) {
|
|||
_, err = c.Add(ctx, "mock_vchannel_2", nil, common.SubscriptionPositionUnknown)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 3, c.Num())
|
||||
c.(*dispatcherManager).mainDispatcher.curTs.Store(1000)
|
||||
c.(*dispatcherManager).mu.RLock()
|
||||
for _, d := range c.(*dispatcherManager).soloDispatchers {
|
||||
d.curTs.Store(1000)
|
||||
}
|
||||
c.(*dispatcherManager).mu.RUnlock()
|
||||
|
||||
c.(*dispatcherManager).tryMerge()
|
||||
assert.Equal(t, 1, c.Num())
|
||||
|
@ -99,6 +105,12 @@ func TestManager(t *testing.T) {
|
|||
_, err = c.Add(ctx, "mock_vchannel_2", nil, common.SubscriptionPositionUnknown)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 3, c.Num())
|
||||
c.(*dispatcherManager).mainDispatcher.curTs.Store(1000)
|
||||
c.(*dispatcherManager).mu.RLock()
|
||||
for _, d := range c.(*dispatcherManager).soloDispatchers {
|
||||
d.curTs.Store(1000)
|
||||
}
|
||||
c.(*dispatcherManager).mu.RUnlock()
|
||||
|
||||
checkIntervalK := paramtable.Get().MQCfg.MergeCheckInterval.Key
|
||||
paramtable.Get().Save(checkIntervalK, "0.01")
|
||||
|
|
Loading…
Reference in New Issue