mirror of https://github.com/milvus-io/milvus.git
Improve global_rmq code coverage (#7928)
Signed-off-by: fishpenguin <kun.yu@zilliz.com>pull/7988/head
parent
2398dafbca
commit
13f9b32452
|
@ -90,7 +90,10 @@ func CloseRocksMQ() {
|
|||
}
|
||||
return true
|
||||
})
|
||||
Rmq.store.Close()
|
||||
// FIXME(yukun): When close Rmq.store, there may be some goroutines in rocksmq.Consume() using the
|
||||
// store instance, so this may cause crash. Needs to send a mutex to rocksmq to stop using store
|
||||
// when Rmq needs to be closed.
|
||||
// Rmq.store.Close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -565,10 +565,6 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum
|
|||
key.Free()
|
||||
val.Free()
|
||||
}
|
||||
if err := iter.Err(); err != nil {
|
||||
log.Debug("RocksMQ: get error from iter.Err()")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// When already consume to last mes, an empty slice will be returned
|
||||
if len(consumerMessage) == 0 {
|
||||
|
|
|
@ -86,6 +86,21 @@ func Test_InitRocksMQ(t *testing.T) {
|
|||
defer Rmq.stopRetention()
|
||||
assert.NoError(t, err)
|
||||
defer CloseRocksMQ()
|
||||
|
||||
topicName := "topic_register"
|
||||
err = Rmq.CreateTopic(topicName)
|
||||
assert.NoError(t, err)
|
||||
groupName := "group_register"
|
||||
_ = Rmq.DestroyConsumerGroup(topicName, groupName)
|
||||
err = Rmq.CreateConsumerGroup(topicName, groupName)
|
||||
assert.Nil(t, err)
|
||||
|
||||
consumer := &Consumer{
|
||||
Topic: topicName,
|
||||
GroupName: groupName,
|
||||
MsgMutex: make(chan struct{}),
|
||||
}
|
||||
Rmq.RegisterConsumer(consumer)
|
||||
}
|
||||
|
||||
func TestRocksmq_RegisterConsumer(t *testing.T) {
|
||||
|
|
|
@ -81,9 +81,6 @@ func prefixLoad(db *gorocksdb.DB, prefix string) ([]string, []string, error) {
|
|||
values = append(values, string(value.Data()))
|
||||
value.Free()
|
||||
}
|
||||
if err := iter.Err(); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
return keys, values, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -110,6 +110,8 @@ func TestRetentionInfo_LoadRetentionInfo(t *testing.T) {
|
|||
assert.Nil(t, err)
|
||||
defer rmq.DestroyTopic(topicName)
|
||||
|
||||
rmq.retentionInfo.startRetentionInfo()
|
||||
|
||||
rmq.retentionInfo.ackedInfo.Delete(topicName)
|
||||
|
||||
msgNum := 100
|
||||
|
|
Loading…
Reference in New Issue