mirror of https://github.com/milvus-io/milvus.git
1105 lines
30 KiB
Go
1105 lines
30 KiB
Go
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
|
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
|
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
|
|
|
package server
|
|
|
|
import (
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/milvus-io/milvus/internal/allocator"
|
|
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
|
rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb"
|
|
"github.com/milvus-io/milvus/internal/util/etcd"
|
|
"github.com/milvus-io/milvus/internal/util/paramtable"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
)
|
|
|
|
var Params paramtable.BaseTable
|
|
var rmqPath = "/tmp/rocksmq"
|
|
var kvPathSuffix = "_kv"
|
|
var metaPathSuffix = "_meta"
|
|
|
|
func InitIDAllocator(kvPath string) *allocator.GlobalIDAllocator {
|
|
rocksdbKV, err := rocksdbkv.NewRocksdbKV(kvPath)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
idAllocator := allocator.NewGlobalIDAllocator("rmq_id", rocksdbKV)
|
|
_ = idAllocator.Initialize()
|
|
return idAllocator
|
|
}
|
|
|
|
func newChanName() string {
|
|
return fmt.Sprintf("my-chan-%v", time.Now().Nanosecond())
|
|
}
|
|
|
|
func newGroupName() string {
|
|
return fmt.Sprintf("my-group-%v", time.Now().Nanosecond())
|
|
}
|
|
|
|
func etcdEndpoints() []string {
|
|
endpoints := os.Getenv("ETCD_ENDPOINTS")
|
|
if endpoints == "" {
|
|
endpoints = "localhost:2379"
|
|
}
|
|
etcdEndpoints := strings.Split(endpoints, ",")
|
|
return etcdEndpoints
|
|
}
|
|
|
|
func TestRocksmq_RegisterConsumer(t *testing.T) {
|
|
suffix := "_register"
|
|
kvPath := rmqPath + kvPathSuffix + suffix
|
|
defer os.RemoveAll(kvPath)
|
|
idAllocator := InitIDAllocator(kvPath)
|
|
|
|
rocksdbPath := rmqPath + suffix
|
|
defer os.RemoveAll(rocksdbPath + kvSuffix)
|
|
defer os.RemoveAll(rocksdbPath)
|
|
|
|
var params paramtable.BaseTable
|
|
params.Init()
|
|
rmq, err := NewRocksMQ(params, rocksdbPath, idAllocator)
|
|
assert.NoError(t, err)
|
|
defer rmq.Close()
|
|
|
|
topicName := "topic_register"
|
|
groupName := "group_register"
|
|
|
|
err = rmq.CreateTopic(topicName)
|
|
assert.NoError(t, err)
|
|
defer rmq.DestroyTopic(topicName)
|
|
|
|
err = rmq.CreateConsumerGroup(topicName, groupName)
|
|
assert.Nil(t, err)
|
|
defer rmq.DestroyConsumerGroup(topicName, groupName)
|
|
|
|
consumer := &Consumer{
|
|
Topic: topicName,
|
|
GroupName: groupName,
|
|
MsgMutex: make(chan struct{}),
|
|
}
|
|
rmq.RegisterConsumer(consumer)
|
|
exist, _, _ := rmq.ExistConsumerGroup(topicName, groupName)
|
|
assert.Equal(t, exist, true)
|
|
dummyGrpName := "group_dummy"
|
|
exist, _, _ = rmq.ExistConsumerGroup(topicName, dummyGrpName)
|
|
assert.Equal(t, exist, false)
|
|
|
|
msgA := "a_message"
|
|
pMsgs := make([]ProducerMessage, 1)
|
|
pMsgA := ProducerMessage{Payload: []byte(msgA)}
|
|
pMsgs[0] = pMsgA
|
|
|
|
_, err = rmq.Produce(topicName, pMsgs)
|
|
assert.Nil(t, err)
|
|
|
|
rmq.Notify(topicName, groupName)
|
|
|
|
consumer1 := &Consumer{
|
|
Topic: topicName,
|
|
GroupName: groupName,
|
|
MsgMutex: make(chan struct{}),
|
|
}
|
|
rmq.RegisterConsumer(consumer1)
|
|
|
|
groupName2 := "group_register2"
|
|
consumer2 := &Consumer{
|
|
Topic: topicName,
|
|
GroupName: groupName2,
|
|
MsgMutex: make(chan struct{}),
|
|
}
|
|
rmq.RegisterConsumer(consumer2)
|
|
}
|
|
|
|
func TestRocksmq_Basic(t *testing.T) {
|
|
suffix := "_rmq"
|
|
|
|
kvPath := rmqPath + kvPathSuffix + suffix
|
|
defer os.RemoveAll(kvPath)
|
|
idAllocator := InitIDAllocator(kvPath)
|
|
|
|
rocksdbPath := rmqPath + suffix
|
|
defer os.RemoveAll(rocksdbPath + kvSuffix)
|
|
defer os.RemoveAll(rocksdbPath)
|
|
var params paramtable.BaseTable
|
|
params.Init()
|
|
rmq, err := NewRocksMQ(params, rocksdbPath, idAllocator)
|
|
assert.Nil(t, err)
|
|
defer rmq.Close()
|
|
|
|
channelName := "channel_rocks"
|
|
err = rmq.CreateTopic(channelName)
|
|
assert.Nil(t, err)
|
|
defer rmq.DestroyTopic(channelName)
|
|
|
|
msgA := "a_message"
|
|
pMsgs := make([]ProducerMessage, 1)
|
|
pMsgA := ProducerMessage{Payload: []byte(msgA)}
|
|
pMsgs[0] = pMsgA
|
|
|
|
_, err = rmq.Produce(channelName, pMsgs)
|
|
assert.Nil(t, err)
|
|
|
|
pMsgB := ProducerMessage{Payload: []byte("b_message")}
|
|
pMsgC := ProducerMessage{Payload: []byte("c_message")}
|
|
|
|
pMsgs[0] = pMsgB
|
|
pMsgs = append(pMsgs, pMsgC)
|
|
_, err = rmq.Produce(channelName, pMsgs)
|
|
assert.Nil(t, err)
|
|
|
|
groupName := "test_group"
|
|
_ = rmq.DestroyConsumerGroup(channelName, groupName)
|
|
err = rmq.CreateConsumerGroup(channelName, groupName)
|
|
assert.Nil(t, err)
|
|
// double create consumer group
|
|
err = rmq.CreateConsumerGroup(channelName, groupName)
|
|
assert.Error(t, err)
|
|
cMsgs, err := rmq.Consume(channelName, groupName, 1)
|
|
assert.Nil(t, err)
|
|
assert.Equal(t, len(cMsgs), 1)
|
|
assert.Equal(t, string(cMsgs[0].Payload), "a_message")
|
|
|
|
cMsgs, err = rmq.Consume(channelName, groupName, 2)
|
|
assert.Nil(t, err)
|
|
assert.Equal(t, len(cMsgs), 2)
|
|
assert.Equal(t, string(cMsgs[0].Payload), "b_message")
|
|
assert.Equal(t, string(cMsgs[1].Payload), "c_message")
|
|
}
|
|
|
|
func TestRocksmq_MultiConsumer(t *testing.T) {
|
|
suffix := "rmq_multi_consumer"
|
|
kvPath := rmqPath + kvPathSuffix + suffix
|
|
defer os.RemoveAll(kvPath)
|
|
idAllocator := InitIDAllocator(kvPath)
|
|
|
|
rocksdbPath := rmqPath + suffix
|
|
defer os.RemoveAll(rocksdbPath + kvSuffix)
|
|
defer os.RemoveAll(rocksdbPath)
|
|
var params paramtable.BaseTable
|
|
params.Init()
|
|
atomic.StoreInt64(&RocksmqPageSize, 10)
|
|
|
|
rmq, err := NewRocksMQ(params, rocksdbPath, idAllocator)
|
|
assert.Nil(t, err)
|
|
defer rmq.Close()
|
|
|
|
channelName := "channel_rocks"
|
|
err = rmq.CreateTopic(channelName)
|
|
assert.Nil(t, err)
|
|
defer rmq.DestroyTopic(channelName)
|
|
|
|
msgNum := 10
|
|
pMsgs := make([]ProducerMessage, msgNum)
|
|
for i := 0; i < msgNum; i++ {
|
|
msg := "message_" + strconv.Itoa(i)
|
|
pMsg := ProducerMessage{Payload: []byte(msg)}
|
|
pMsgs[i] = pMsg
|
|
}
|
|
ids, err := rmq.Produce(channelName, pMsgs)
|
|
assert.Nil(t, err)
|
|
assert.Equal(t, len(pMsgs), len(ids))
|
|
|
|
for i := 0; i <= 10; i++ {
|
|
groupName := "group_" + strconv.Itoa(i)
|
|
_ = rmq.DestroyConsumerGroup(channelName, groupName)
|
|
err = rmq.CreateConsumerGroup(channelName, groupName)
|
|
assert.Nil(t, err)
|
|
|
|
consumer := &Consumer{
|
|
Topic: channelName,
|
|
GroupName: groupName,
|
|
}
|
|
rmq.RegisterConsumer(consumer)
|
|
}
|
|
|
|
for i := 0; i <= 10; i++ {
|
|
groupName := "group_" + strconv.Itoa(i)
|
|
cMsgs, err := rmq.Consume(channelName, groupName, 10)
|
|
assert.Nil(t, err)
|
|
assert.Equal(t, len(cMsgs), 10)
|
|
assert.Equal(t, string(cMsgs[0].Payload), "message_0")
|
|
}
|
|
}
|
|
func TestRocksmq_Dummy(t *testing.T) {
|
|
suffix := "_dummy"
|
|
kvPath := rmqPath + kvPathSuffix + suffix
|
|
defer os.RemoveAll(kvPath)
|
|
idAllocator := InitIDAllocator(kvPath)
|
|
|
|
rocksdbPath := rmqPath + suffix
|
|
defer os.RemoveAll(rocksdbPath + kvSuffix)
|
|
defer os.RemoveAll(rocksdbPath)
|
|
var params paramtable.BaseTable
|
|
params.Init()
|
|
rmq, err := NewRocksMQ(params, rocksdbPath, idAllocator)
|
|
assert.Nil(t, err)
|
|
defer rmq.Close()
|
|
|
|
_, err = NewRocksMQ(params, "", idAllocator)
|
|
assert.Error(t, err)
|
|
|
|
channelName := "channel_a"
|
|
err = rmq.CreateTopic(channelName)
|
|
assert.Nil(t, err)
|
|
defer rmq.DestroyTopic(channelName)
|
|
// create topic twice should be ignored
|
|
err = rmq.CreateTopic(channelName)
|
|
assert.Nil(t, err)
|
|
|
|
channelName1 := "channel_dummy"
|
|
topicMu.Store(channelName1, new(sync.Mutex))
|
|
err = rmq.DestroyTopic(channelName1)
|
|
assert.NoError(t, err)
|
|
|
|
err = rmq.DestroyConsumerGroup(channelName, channelName1)
|
|
assert.NoError(t, err)
|
|
|
|
_, err = rmq.Produce(channelName, nil)
|
|
assert.Error(t, err)
|
|
|
|
_, err = rmq.Produce(channelName1, nil)
|
|
assert.Error(t, err)
|
|
|
|
groupName1 := "group_dummy"
|
|
err = rmq.Seek(channelName1, groupName1, 0)
|
|
assert.Error(t, err)
|
|
|
|
channelName2 := strings.Repeat(channelName1, 100)
|
|
err = rmq.CreateTopic(string(channelName2))
|
|
assert.NoError(t, err)
|
|
_, err = rmq.Produce(string(channelName2), nil)
|
|
assert.Error(t, err)
|
|
|
|
channelName3 := "channel/dummy"
|
|
err = rmq.CreateTopic(channelName3)
|
|
assert.Error(t, err)
|
|
|
|
msgA := "a_message"
|
|
pMsgs := make([]ProducerMessage, 1)
|
|
pMsgA := ProducerMessage{Payload: []byte(msgA)}
|
|
pMsgs[0] = pMsgA
|
|
|
|
topicMu.Delete(channelName)
|
|
_, err = rmq.Consume(channelName, groupName1, 1)
|
|
assert.Error(t, err)
|
|
topicMu.Store(channelName, channelName)
|
|
_, err = rmq.Produce(channelName, nil)
|
|
assert.Error(t, err)
|
|
|
|
_, err = rmq.Consume(channelName, groupName1, 1)
|
|
assert.Error(t, err)
|
|
}
|
|
|
|
func TestRocksmq_Seek(t *testing.T) {
|
|
suffix := "_seek"
|
|
kvPath := rmqPath + kvPathSuffix + suffix
|
|
defer os.RemoveAll(kvPath)
|
|
idAllocator := InitIDAllocator(kvPath)
|
|
|
|
rocksdbPath := rmqPath + suffix
|
|
defer os.RemoveAll(rocksdbPath + kvSuffix)
|
|
defer os.RemoveAll(rocksdbPath)
|
|
|
|
var params paramtable.BaseTable
|
|
params.Init()
|
|
rmq, err := NewRocksMQ(params, rocksdbPath, idAllocator)
|
|
assert.Nil(t, err)
|
|
defer rmq.Close()
|
|
|
|
_, err = NewRocksMQ(params, "", idAllocator)
|
|
assert.Error(t, err)
|
|
defer os.RemoveAll("_meta_kv")
|
|
|
|
channelName := "channel_seek"
|
|
err = rmq.CreateTopic(channelName)
|
|
assert.NoError(t, err)
|
|
defer rmq.DestroyTopic(channelName)
|
|
|
|
var seekID UniqueID
|
|
var seekID2 UniqueID
|
|
for i := 0; i < 100; i++ {
|
|
msg := "message_" + strconv.Itoa(i)
|
|
pMsg := ProducerMessage{Payload: []byte(msg)}
|
|
pMsgs := make([]ProducerMessage, 1)
|
|
pMsgs[0] = pMsg
|
|
id, err := rmq.Produce(channelName, pMsgs)
|
|
if i == 50 {
|
|
seekID = id[0]
|
|
}
|
|
if i == 51 {
|
|
seekID2 = id[0]
|
|
}
|
|
assert.Nil(t, err)
|
|
}
|
|
|
|
groupName1 := "group_dummy"
|
|
|
|
err = rmq.CreateConsumerGroup(channelName, groupName1)
|
|
assert.NoError(t, err)
|
|
err = rmq.Seek(channelName, groupName1, seekID)
|
|
assert.NoError(t, err)
|
|
|
|
messages, err := rmq.Consume(channelName, groupName1, 1)
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, messages[0].MsgID, seekID)
|
|
|
|
messages, err = rmq.Consume(channelName, groupName1, 1)
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, messages[0].MsgID, seekID2)
|
|
|
|
_ = rmq.DestroyConsumerGroup(channelName, groupName1)
|
|
|
|
}
|
|
|
|
func TestRocksmq_Loop(t *testing.T) {
|
|
ep := etcdEndpoints()
|
|
etcdCli, err := etcd.GetRemoteEtcdClient(ep)
|
|
assert.Nil(t, err)
|
|
defer etcdCli.Close()
|
|
etcdKV := etcdkv.NewEtcdKV(etcdCli, "/etcd/test/root")
|
|
assert.Nil(t, err)
|
|
defer etcdKV.Close()
|
|
idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV)
|
|
_ = idAllocator.Initialize()
|
|
|
|
name := "/tmp/rocksmq_1"
|
|
_ = os.RemoveAll(name)
|
|
defer os.RemoveAll(name)
|
|
kvName := name + "_meta_kv"
|
|
_ = os.RemoveAll(kvName)
|
|
defer os.RemoveAll(kvName)
|
|
var params paramtable.BaseTable
|
|
params.Init()
|
|
rmq, err := NewRocksMQ(params, name, idAllocator)
|
|
assert.Nil(t, err)
|
|
defer rmq.Close()
|
|
|
|
loopNum := 100
|
|
channelName := "channel_test"
|
|
err = rmq.CreateTopic(channelName)
|
|
assert.Nil(t, err)
|
|
defer rmq.DestroyTopic(channelName)
|
|
|
|
// Produce one message once
|
|
for i := 0; i < loopNum; i++ {
|
|
msg := "message_" + strconv.Itoa(i)
|
|
pMsg := ProducerMessage{Payload: []byte(msg)}
|
|
pMsgs := make([]ProducerMessage, 1)
|
|
pMsgs[0] = pMsg
|
|
_, err := rmq.Produce(channelName, pMsgs)
|
|
assert.Nil(t, err)
|
|
}
|
|
|
|
// Produce loopNum messages once
|
|
pMsgs := make([]ProducerMessage, loopNum)
|
|
for i := 0; i < loopNum; i++ {
|
|
msg := "message_" + strconv.Itoa(i+loopNum)
|
|
pMsg := ProducerMessage{Payload: []byte(msg)}
|
|
pMsgs[i] = pMsg
|
|
}
|
|
_, err = rmq.Produce(channelName, pMsgs)
|
|
assert.Nil(t, err)
|
|
|
|
// Consume loopNum message once
|
|
groupName := "test_group"
|
|
_ = rmq.DestroyConsumerGroup(channelName, groupName)
|
|
err = rmq.CreateConsumerGroup(channelName, groupName)
|
|
assert.Nil(t, err)
|
|
cMsgs, err := rmq.Consume(channelName, groupName, loopNum)
|
|
assert.Nil(t, err)
|
|
assert.Equal(t, len(cMsgs), loopNum)
|
|
assert.Equal(t, string(cMsgs[0].Payload), "message_"+strconv.Itoa(0))
|
|
assert.Equal(t, string(cMsgs[loopNum-1].Payload), "message_"+strconv.Itoa(loopNum-1))
|
|
|
|
// Consume one message once
|
|
for i := 0; i < loopNum; i++ {
|
|
oneMsgs, err := rmq.Consume(channelName, groupName, 1)
|
|
assert.Nil(t, err)
|
|
assert.Equal(t, len(oneMsgs), 1)
|
|
assert.Equal(t, string(oneMsgs[0].Payload), "message_"+strconv.Itoa(i+loopNum))
|
|
}
|
|
|
|
cMsgs, err = rmq.Consume(channelName, groupName, 1)
|
|
assert.Nil(t, err)
|
|
assert.Equal(t, len(cMsgs), 0)
|
|
}
|
|
|
|
func TestRocksmq_Goroutines(t *testing.T) {
|
|
ep := etcdEndpoints()
|
|
etcdCli, err := etcd.GetRemoteEtcdClient(ep)
|
|
assert.Nil(t, err)
|
|
defer etcdCli.Close()
|
|
etcdKV := etcdkv.NewEtcdKV(etcdCli, "/etcd/test/root")
|
|
assert.Nil(t, err)
|
|
defer etcdKV.Close()
|
|
idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV)
|
|
_ = idAllocator.Initialize()
|
|
|
|
name := "/tmp/rocksmq_2"
|
|
defer os.RemoveAll(name)
|
|
kvName := name + "_meta_kv"
|
|
_ = os.RemoveAll(kvName)
|
|
defer os.RemoveAll(kvName)
|
|
var params paramtable.BaseTable
|
|
params.Init()
|
|
rmq, err := NewRocksMQ(params, name, idAllocator)
|
|
assert.Nil(t, err)
|
|
defer rmq.Close()
|
|
|
|
loopNum := 100
|
|
channelName := "channel_test"
|
|
err = rmq.CreateTopic(channelName)
|
|
assert.Nil(t, err)
|
|
defer rmq.DestroyTopic(channelName)
|
|
|
|
// Produce two message in each goroutine
|
|
msgChan := make(chan string, loopNum)
|
|
var wg sync.WaitGroup
|
|
for i := 0; i < loopNum; i += 2 {
|
|
wg.Add(2)
|
|
go func(i int, mq RocksMQ) {
|
|
msg0 := "message_" + strconv.Itoa(i)
|
|
msg1 := "message_" + strconv.Itoa(i+1)
|
|
pMsg0 := ProducerMessage{Payload: []byte(msg0)}
|
|
pMsg1 := ProducerMessage{Payload: []byte(msg1)}
|
|
pMsgs := make([]ProducerMessage, 2)
|
|
pMsgs[0] = pMsg0
|
|
pMsgs[1] = pMsg1
|
|
|
|
ids, err := mq.Produce(channelName, pMsgs)
|
|
assert.Nil(t, err)
|
|
assert.Equal(t, len(pMsgs), len(ids))
|
|
msgChan <- msg0
|
|
msgChan <- msg1
|
|
}(i, rmq)
|
|
}
|
|
|
|
groupName := "test_group"
|
|
_ = rmq.DestroyConsumerGroup(channelName, groupName)
|
|
err = rmq.CreateConsumerGroup(channelName, groupName)
|
|
assert.Nil(t, err)
|
|
// Consume one message in each goroutine
|
|
for i := 0; i < loopNum; i++ {
|
|
go func(group *sync.WaitGroup, mq RocksMQ) {
|
|
defer group.Done()
|
|
<-msgChan
|
|
cMsgs, err := mq.Consume(channelName, groupName, 1)
|
|
assert.Nil(t, err)
|
|
assert.Equal(t, len(cMsgs), 1)
|
|
}(&wg, rmq)
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
/**
|
|
This test is aim to measure RocksMq throughout.
|
|
Hardware:
|
|
CPU Intel(R) Core(TM) i7-8700 CPU @ 3.20GHz
|
|
Disk SSD
|
|
|
|
Test with 1,000,000 message, result is as follow:
|
|
Produce: 190000 message / s
|
|
Consume: 90000 message / s
|
|
*/
|
|
func TestRocksmq_Throughout(t *testing.T) {
|
|
ep := etcdEndpoints()
|
|
etcdCli, err := etcd.GetRemoteEtcdClient(ep)
|
|
assert.Nil(t, err)
|
|
defer etcdCli.Close()
|
|
etcdKV := etcdkv.NewEtcdKV(etcdCli, "/etcd/test/root")
|
|
assert.Nil(t, err)
|
|
defer etcdKV.Close()
|
|
idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV)
|
|
_ = idAllocator.Initialize()
|
|
|
|
name := "/tmp/rocksmq_3"
|
|
defer os.RemoveAll(name)
|
|
kvName := name + "_meta_kv"
|
|
_ = os.RemoveAll(kvName)
|
|
defer os.RemoveAll(kvName)
|
|
var params paramtable.BaseTable
|
|
params.Init()
|
|
rmq, err := NewRocksMQ(params, name, idAllocator)
|
|
assert.Nil(t, err)
|
|
defer rmq.Close()
|
|
|
|
channelName := "channel_throughout_test"
|
|
err = rmq.CreateTopic(channelName)
|
|
assert.Nil(t, err)
|
|
defer rmq.DestroyTopic(channelName)
|
|
|
|
entityNum := 100000
|
|
|
|
pt0 := time.Now().UnixNano() / int64(time.Millisecond)
|
|
for i := 0; i < entityNum; i++ {
|
|
msg := "message_" + strconv.Itoa(i)
|
|
pMsg := ProducerMessage{Payload: []byte(msg)}
|
|
ids, err := rmq.Produce(channelName, []ProducerMessage{pMsg})
|
|
assert.Nil(t, err)
|
|
assert.EqualValues(t, 1, len(ids))
|
|
}
|
|
pt1 := time.Now().UnixNano() / int64(time.Millisecond)
|
|
pDuration := pt1 - pt0
|
|
log.Printf("Total produce %d item, cost %v ms, throughout %v / s", entityNum, pDuration, int64(entityNum)*1000/pDuration)
|
|
|
|
groupName := "test_throughout_group"
|
|
_ = rmq.DestroyConsumerGroup(channelName, groupName)
|
|
err = rmq.CreateConsumerGroup(channelName, groupName)
|
|
assert.Nil(t, err)
|
|
defer rmq.DestroyConsumerGroup(groupName, channelName)
|
|
|
|
// Consume one message in each goroutine
|
|
ct0 := time.Now().UnixNano() / int64(time.Millisecond)
|
|
for i := 0; i < entityNum; i++ {
|
|
cMsgs, err := rmq.Consume(channelName, groupName, 1)
|
|
assert.Nil(t, err)
|
|
assert.Equal(t, len(cMsgs), 1)
|
|
}
|
|
ct1 := time.Now().UnixNano() / int64(time.Millisecond)
|
|
cDuration := ct1 - ct0
|
|
log.Printf("Total consume %d item, cost %v ms, throughout %v / s", entityNum, cDuration, int64(entityNum)*1000/cDuration)
|
|
}
|
|
|
|
func TestRocksmq_MultiChan(t *testing.T) {
|
|
ep := etcdEndpoints()
|
|
etcdCli, err := etcd.GetRemoteEtcdClient(ep)
|
|
assert.Nil(t, err)
|
|
defer etcdCli.Close()
|
|
etcdKV := etcdkv.NewEtcdKV(etcdCli, "/etcd/test/root")
|
|
assert.Nil(t, err)
|
|
defer etcdKV.Close()
|
|
idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV)
|
|
_ = idAllocator.Initialize()
|
|
|
|
name := "/tmp/rocksmq_multichan"
|
|
defer os.RemoveAll(name)
|
|
kvName := name + "_meta_kv"
|
|
_ = os.RemoveAll(kvName)
|
|
defer os.RemoveAll(kvName)
|
|
var params paramtable.BaseTable
|
|
params.Init()
|
|
rmq, err := NewRocksMQ(params, name, idAllocator)
|
|
assert.Nil(t, err)
|
|
defer rmq.Close()
|
|
|
|
channelName0 := "chan01"
|
|
channelName1 := "chan11"
|
|
err = rmq.CreateTopic(channelName0)
|
|
assert.Nil(t, err)
|
|
defer rmq.DestroyTopic(channelName0)
|
|
err = rmq.CreateTopic(channelName1)
|
|
assert.Nil(t, err)
|
|
defer rmq.DestroyTopic(channelName1)
|
|
assert.Nil(t, err)
|
|
|
|
loopNum := 10
|
|
for i := 0; i < loopNum; i++ {
|
|
msg0 := "for_chann0_" + strconv.Itoa(i)
|
|
msg1 := "for_chann1_" + strconv.Itoa(i)
|
|
pMsg0 := ProducerMessage{Payload: []byte(msg0)}
|
|
pMsg1 := ProducerMessage{Payload: []byte(msg1)}
|
|
_, err = rmq.Produce(channelName0, []ProducerMessage{pMsg0})
|
|
assert.Nil(t, err)
|
|
_, err = rmq.Produce(channelName1, []ProducerMessage{pMsg1})
|
|
assert.Nil(t, err)
|
|
}
|
|
|
|
groupName := "test_group"
|
|
_ = rmq.DestroyConsumerGroup(channelName1, groupName)
|
|
err = rmq.CreateConsumerGroup(channelName1, groupName)
|
|
assert.Nil(t, err)
|
|
cMsgs, err := rmq.Consume(channelName1, groupName, 1)
|
|
assert.Nil(t, err)
|
|
assert.Equal(t, len(cMsgs), 1)
|
|
assert.Equal(t, string(cMsgs[0].Payload), "for_chann1_"+strconv.Itoa(0))
|
|
}
|
|
|
|
func TestRocksmq_CopyData(t *testing.T) {
|
|
ep := etcdEndpoints()
|
|
etcdCli, err := etcd.GetRemoteEtcdClient(ep)
|
|
assert.Nil(t, err)
|
|
defer etcdCli.Close()
|
|
etcdKV := etcdkv.NewEtcdKV(etcdCli, "/etcd/test/root")
|
|
assert.Nil(t, err)
|
|
defer etcdKV.Close()
|
|
idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV)
|
|
_ = idAllocator.Initialize()
|
|
|
|
name := "/tmp/rocksmq_copydata"
|
|
defer os.RemoveAll(name)
|
|
kvName := name + "_meta_kv"
|
|
_ = os.RemoveAll(kvName)
|
|
defer os.RemoveAll(kvName)
|
|
var params paramtable.BaseTable
|
|
params.Init()
|
|
rmq, err := NewRocksMQ(params, name, idAllocator)
|
|
assert.Nil(t, err)
|
|
defer rmq.Close()
|
|
|
|
channelName0 := "test_chan01"
|
|
channelName1 := "test_chan11"
|
|
err = rmq.CreateTopic(channelName0)
|
|
assert.Nil(t, err)
|
|
defer rmq.DestroyTopic(channelName0)
|
|
err = rmq.CreateTopic(channelName1)
|
|
assert.Nil(t, err)
|
|
defer rmq.DestroyTopic(channelName1)
|
|
assert.Nil(t, err)
|
|
|
|
msg0 := "abcde"
|
|
pMsg0 := ProducerMessage{Payload: []byte(msg0)}
|
|
_, err = rmq.Produce(channelName0, []ProducerMessage{pMsg0})
|
|
assert.Nil(t, err)
|
|
|
|
pMsg1 := ProducerMessage{Payload: nil}
|
|
_, err = rmq.Produce(channelName1, []ProducerMessage{pMsg1})
|
|
assert.Nil(t, err)
|
|
|
|
pMsg2 := ProducerMessage{Payload: []byte{}}
|
|
_, err = rmq.Produce(channelName1, []ProducerMessage{pMsg2})
|
|
assert.Nil(t, err)
|
|
|
|
var emptyTargetData []byte
|
|
pMsg3 := ProducerMessage{Payload: emptyTargetData}
|
|
_, err = rmq.Produce(channelName1, []ProducerMessage{pMsg3})
|
|
assert.Nil(t, err)
|
|
|
|
groupName := "test_group"
|
|
_ = rmq.DestroyConsumerGroup(channelName0, groupName)
|
|
err = rmq.CreateConsumerGroup(channelName0, groupName)
|
|
assert.Nil(t, err)
|
|
cMsgs0, err := rmq.Consume(channelName0, groupName, 1)
|
|
assert.Nil(t, err)
|
|
assert.Equal(t, len(cMsgs0), 1)
|
|
assert.Equal(t, string(cMsgs0[0].Payload), msg0)
|
|
|
|
_ = rmq.DestroyConsumerGroup(channelName1, groupName)
|
|
err = rmq.CreateConsumerGroup(channelName1, groupName)
|
|
assert.Nil(t, err)
|
|
cMsgs1, err := rmq.Consume(channelName1, groupName, 3)
|
|
assert.Nil(t, err)
|
|
assert.Equal(t, 3, len(cMsgs1))
|
|
assert.Equal(t, emptyTargetData, cMsgs1[0].Payload)
|
|
}
|
|
|
|
func TestRocksmq_SeekToLatest(t *testing.T) {
|
|
ep := etcdEndpoints()
|
|
etcdCli, err := etcd.GetRemoteEtcdClient(ep)
|
|
assert.Nil(t, err)
|
|
defer etcdCli.Close()
|
|
etcdKV := etcdkv.NewEtcdKV(etcdCli, "/etcd/test/root")
|
|
assert.Nil(t, err)
|
|
defer etcdKV.Close()
|
|
idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV)
|
|
_ = idAllocator.Initialize()
|
|
|
|
name := "/tmp/rocksmq_seektolatest"
|
|
defer os.RemoveAll(name)
|
|
kvName := name + "_meta_kv"
|
|
_ = os.RemoveAll(kvName)
|
|
defer os.RemoveAll(kvName)
|
|
var params paramtable.BaseTable
|
|
params.Init()
|
|
rmq, err := NewRocksMQ(params, name, idAllocator)
|
|
assert.Nil(t, err)
|
|
defer rmq.Close()
|
|
|
|
channelName := "channel_test"
|
|
err = rmq.CreateTopic(channelName)
|
|
assert.Nil(t, err)
|
|
defer rmq.DestroyTopic(channelName)
|
|
loopNum := 100
|
|
|
|
err = rmq.SeekToLatest(channelName, "dummy_group")
|
|
assert.Error(t, err)
|
|
|
|
// Consume loopNum message once
|
|
groupName := "group_test"
|
|
_ = rmq.DestroyConsumerGroup(channelName, groupName)
|
|
err = rmq.CreateConsumerGroup(channelName, groupName)
|
|
assert.Nil(t, err)
|
|
|
|
err = rmq.SeekToLatest(channelName, groupName)
|
|
assert.NoError(t, err)
|
|
|
|
channelNamePrev := "channel_tes"
|
|
err = rmq.CreateTopic(channelNamePrev)
|
|
assert.Nil(t, err)
|
|
defer rmq.DestroyTopic(channelNamePrev)
|
|
pMsgs := make([]ProducerMessage, loopNum)
|
|
for i := 0; i < loopNum; i++ {
|
|
msg := "message_" + strconv.Itoa(i)
|
|
pMsg := ProducerMessage{Payload: []byte(msg)}
|
|
pMsgs[i] = pMsg
|
|
}
|
|
_, err = rmq.Produce(channelNamePrev, pMsgs)
|
|
assert.Nil(t, err)
|
|
|
|
// should hit the case where channel is null
|
|
err = rmq.SeekToLatest(channelName, groupName)
|
|
assert.NoError(t, err)
|
|
|
|
ids, err := rmq.Produce(channelName, pMsgs)
|
|
assert.Nil(t, err)
|
|
|
|
// able to read out
|
|
cMsgs, err := rmq.Consume(channelName, groupName, loopNum)
|
|
assert.Nil(t, err)
|
|
assert.Equal(t, len(cMsgs), loopNum)
|
|
for i := 0; i < loopNum; i++ {
|
|
assert.Equal(t, cMsgs[i].MsgID, ids[i])
|
|
}
|
|
|
|
err = rmq.SeekToLatest(channelName, groupName)
|
|
assert.NoError(t, err)
|
|
|
|
cMsgs, err = rmq.Consume(channelName, groupName, loopNum)
|
|
assert.Nil(t, err)
|
|
assert.Equal(t, len(cMsgs), 0)
|
|
|
|
pMsgs = make([]ProducerMessage, loopNum)
|
|
for i := 0; i < loopNum; i++ {
|
|
msg := "message_" + strconv.Itoa(i+loopNum)
|
|
pMsg := ProducerMessage{Payload: []byte(msg)}
|
|
pMsgs[i] = pMsg
|
|
}
|
|
ids, err = rmq.Produce(channelName, pMsgs)
|
|
assert.Nil(t, err)
|
|
|
|
// make sure we only consume the latest message
|
|
cMsgs, err = rmq.Consume(channelName, groupName, loopNum)
|
|
assert.Nil(t, err)
|
|
assert.Equal(t, len(cMsgs), loopNum)
|
|
for i := 0; i < loopNum; i++ {
|
|
assert.Equal(t, cMsgs[i].MsgID, ids[i])
|
|
}
|
|
}
|
|
|
|
func TestRocksmq_GetLatestMsg(t *testing.T) {
|
|
ep := etcdEndpoints()
|
|
etcdCli, err := etcd.GetRemoteEtcdClient(ep)
|
|
assert.Nil(t, err)
|
|
defer etcdCli.Close()
|
|
etcdKV := etcdkv.NewEtcdKV(etcdCli, "/etcd/test/root")
|
|
assert.Nil(t, err)
|
|
defer etcdKV.Close()
|
|
idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV)
|
|
_ = idAllocator.Initialize()
|
|
|
|
name := "/tmp/rocksmq_data"
|
|
defer os.RemoveAll(name)
|
|
kvName := name + "_meta_kv"
|
|
_ = os.RemoveAll(kvName)
|
|
defer os.RemoveAll(kvName)
|
|
var params paramtable.BaseTable
|
|
params.Init()
|
|
rmq, err := NewRocksMQ(params, name, idAllocator)
|
|
assert.Nil(t, err)
|
|
|
|
channelName := newChanName()
|
|
err = rmq.CreateTopic(channelName)
|
|
assert.Nil(t, err)
|
|
|
|
// Consume loopNum message once
|
|
groupName := "last_msg_test"
|
|
_ = rmq.DestroyConsumerGroup(channelName, groupName)
|
|
err = rmq.CreateConsumerGroup(channelName, groupName)
|
|
assert.Nil(t, err)
|
|
|
|
msgID, err := rmq.GetLatestMsg(channelName)
|
|
assert.Equal(t, msgID, int64(DefaultMessageID))
|
|
assert.Nil(t, err)
|
|
|
|
loopNum := 10
|
|
pMsgs1 := make([]ProducerMessage, loopNum)
|
|
pMsgs2 := make([]ProducerMessage, loopNum)
|
|
for i := 0; i < loopNum; i++ {
|
|
msg := "message_" + strconv.Itoa(i)
|
|
pMsg := ProducerMessage{Payload: []byte(msg)}
|
|
pMsgs1[i] = pMsg
|
|
|
|
msg = "2message_" + strconv.Itoa(i)
|
|
pMsg = ProducerMessage{Payload: []byte(msg)}
|
|
pMsgs2[i] = pMsg
|
|
}
|
|
|
|
ids, err := rmq.Produce(channelName, pMsgs1)
|
|
assert.Nil(t, err)
|
|
assert.Equal(t, len(ids), loopNum)
|
|
|
|
// test latest msg when one topic is created
|
|
msgID, err = rmq.GetLatestMsg(channelName)
|
|
assert.Nil(t, err)
|
|
assert.Equal(t, msgID, ids[loopNum-1])
|
|
|
|
// test latest msg when two topics are created
|
|
channelName2 := newChanName()
|
|
err = rmq.CreateTopic(channelName2)
|
|
assert.Nil(t, err)
|
|
ids, err = rmq.Produce(channelName2, pMsgs2)
|
|
assert.Nil(t, err)
|
|
|
|
msgID, err = rmq.GetLatestMsg(channelName2)
|
|
assert.Nil(t, err)
|
|
assert.Equal(t, msgID, ids[loopNum-1])
|
|
|
|
// test close rmq
|
|
rmq.DestroyTopic(channelName)
|
|
rmq.Close()
|
|
msgID, err = rmq.GetLatestMsg(channelName)
|
|
assert.Equal(t, msgID, int64(DefaultMessageID))
|
|
assert.NotNil(t, err)
|
|
}
|
|
|
|
func TestRocksmq_Close(t *testing.T) {
|
|
ep := etcdEndpoints()
|
|
etcdCli, err := etcd.GetRemoteEtcdClient(ep)
|
|
assert.Nil(t, err)
|
|
defer etcdCli.Close()
|
|
etcdKV := etcdkv.NewEtcdKV(etcdCli, "/etcd/test/root")
|
|
assert.Nil(t, err)
|
|
defer etcdKV.Close()
|
|
idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV)
|
|
_ = idAllocator.Initialize()
|
|
|
|
name := "/tmp/rocksmq_close"
|
|
defer os.RemoveAll(name)
|
|
kvName := name + "_meta_kv"
|
|
_ = os.RemoveAll(kvName)
|
|
defer os.RemoveAll(kvName)
|
|
var params paramtable.BaseTable
|
|
params.Init()
|
|
rmq, err := NewRocksMQ(params, name, idAllocator)
|
|
assert.Nil(t, err)
|
|
defer rmq.Close()
|
|
|
|
atomic.StoreInt64(&rmq.state, RmqStateStopped)
|
|
assert.Error(t, rmq.CreateTopic(""))
|
|
assert.Error(t, rmq.CreateConsumerGroup("", ""))
|
|
rmq.RegisterConsumer(&Consumer{})
|
|
_, err = rmq.Produce("", nil)
|
|
assert.Error(t, err)
|
|
_, err = rmq.Consume("", "", 0)
|
|
assert.Error(t, err)
|
|
|
|
assert.Error(t, rmq.seek("", "", 0))
|
|
assert.Error(t, rmq.ForceSeek("", "", 0))
|
|
assert.Error(t, rmq.SeekToLatest("", ""))
|
|
}
|
|
|
|
func TestRocksmq_SeekWithNoConsumerError(t *testing.T) {
|
|
ep := etcdEndpoints()
|
|
etcdCli, err := etcd.GetRemoteEtcdClient(ep)
|
|
assert.Nil(t, err)
|
|
etcdKV := etcdkv.NewEtcdKV(etcdCli, "/etcd/test/root")
|
|
defer etcdKV.Close()
|
|
idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV)
|
|
_ = idAllocator.Initialize()
|
|
|
|
name := "/tmp/rocksmq_seekerror"
|
|
defer os.RemoveAll(name)
|
|
kvName := name + "_meta_kv"
|
|
_ = os.RemoveAll(kvName)
|
|
defer os.RemoveAll(kvName)
|
|
var params paramtable.BaseTable
|
|
params.Init()
|
|
rmq, err := NewRocksMQ(params, name, idAllocator)
|
|
assert.Nil(t, err)
|
|
defer rmq.Close()
|
|
|
|
rmq.CreateTopic("test")
|
|
err = rmq.Seek("test", "", 0)
|
|
fmt.Println(err)
|
|
assert.Error(t, err)
|
|
assert.Error(t, rmq.ForceSeek("test", "", 0))
|
|
}
|
|
|
|
func TestRocksmq_SeekTopicNotExistError(t *testing.T) {
|
|
ep := etcdEndpoints()
|
|
etcdCli, err := etcd.GetRemoteEtcdClient(ep)
|
|
assert.Nil(t, err)
|
|
etcdKV := etcdkv.NewEtcdKV(etcdCli, "/etcd/test/root")
|
|
defer etcdKV.Close()
|
|
idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV)
|
|
_ = idAllocator.Initialize()
|
|
|
|
name := "/tmp/rocksmq_seekerror2"
|
|
defer os.RemoveAll(name)
|
|
kvName := name + "_meta_kv"
|
|
_ = os.RemoveAll(kvName)
|
|
defer os.RemoveAll(kvName)
|
|
var params paramtable.BaseTable
|
|
params.Init()
|
|
rmq, err := NewRocksMQ(params, name, idAllocator)
|
|
assert.Nil(t, err)
|
|
defer rmq.Close()
|
|
|
|
assert.Error(t, rmq.Seek("test_topic_not_exist", "", 0))
|
|
assert.Error(t, rmq.ForceSeek("test_topic_not_exist", "", 0))
|
|
}
|
|
|
|
func TestRocksmq_SeekTopicMutexError(t *testing.T) {
|
|
ep := etcdEndpoints()
|
|
etcdCli, err := etcd.GetRemoteEtcdClient(ep)
|
|
assert.Nil(t, err)
|
|
etcdKV := etcdkv.NewEtcdKV(etcdCli, "/etcd/test/root")
|
|
defer etcdKV.Close()
|
|
idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV)
|
|
_ = idAllocator.Initialize()
|
|
|
|
name := "/tmp/rocksmq_seekerror2"
|
|
defer os.RemoveAll(name)
|
|
kvName := name + "_meta_kv"
|
|
_ = os.RemoveAll(kvName)
|
|
defer os.RemoveAll(kvName)
|
|
var params paramtable.BaseTable
|
|
params.Init()
|
|
rmq, err := NewRocksMQ(params, name, idAllocator)
|
|
assert.Nil(t, err)
|
|
defer rmq.Close()
|
|
|
|
topicMu.Store("test_topic_mutix_error", nil)
|
|
assert.Error(t, rmq.Seek("test_topic_mutix_error", "", 0))
|
|
assert.Error(t, rmq.ForceSeek("test_topic_mutix_error", "", 0))
|
|
}
|
|
|
|
func TestRocksmq_moveConsumePosError(t *testing.T) {
|
|
ep := etcdEndpoints()
|
|
etcdCli, err := etcd.GetRemoteEtcdClient(ep)
|
|
assert.Nil(t, err)
|
|
etcdKV := etcdkv.NewEtcdKV(etcdCli, "/etcd/test/root")
|
|
defer etcdKV.Close()
|
|
idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV)
|
|
_ = idAllocator.Initialize()
|
|
|
|
name := "/tmp/rocksmq_moveconsumeposerror"
|
|
defer os.RemoveAll(name)
|
|
kvName := name + "_meta_kv"
|
|
_ = os.RemoveAll(kvName)
|
|
defer os.RemoveAll(kvName)
|
|
var params paramtable.BaseTable
|
|
params.Init()
|
|
rmq, err := NewRocksMQ(params, name, idAllocator)
|
|
assert.Nil(t, err)
|
|
defer rmq.Close()
|
|
|
|
rmq.CreateTopic("test_moveConsumePos")
|
|
assert.Error(t, rmq.moveConsumePos("test_moveConsumePos", "", 0))
|
|
}
|
|
|
|
func TestRocksmq_updateAckedInfoErr(t *testing.T) {
|
|
ep := etcdEndpoints()
|
|
etcdCli, err := etcd.GetRemoteEtcdClient(ep)
|
|
assert.Nil(t, err)
|
|
etcdKV := etcdkv.NewEtcdKV(etcdCli, "/etcd/test/root")
|
|
defer etcdKV.Close()
|
|
idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV)
|
|
_ = idAllocator.Initialize()
|
|
|
|
name := "/tmp/rocksmq_updateackedinfoerror"
|
|
defer os.RemoveAll(name)
|
|
kvName := name + "_meta_kv"
|
|
_ = os.RemoveAll(kvName)
|
|
defer os.RemoveAll(kvName)
|
|
var params paramtable.BaseTable
|
|
params.Init()
|
|
atomic.StoreInt64(&RocksmqPageSize, 10)
|
|
rmq, err := NewRocksMQ(params, name, idAllocator)
|
|
assert.Nil(t, err)
|
|
defer rmq.Close()
|
|
|
|
topicName := "test_updateAckedInfo"
|
|
rmq.CreateTopic(topicName)
|
|
defer rmq.DestroyTopic(topicName)
|
|
|
|
//add message, make sure rmq has more than one page
|
|
msgNum := 100
|
|
pMsgs := make([]ProducerMessage, msgNum)
|
|
for i := 0; i < msgNum; i++ {
|
|
msg := "message_" + strconv.Itoa(i)
|
|
pMsg := ProducerMessage{Payload: []byte(msg)}
|
|
pMsgs[i] = pMsg
|
|
}
|
|
ids, err := rmq.Produce(topicName, pMsgs)
|
|
assert.Nil(t, err)
|
|
assert.Equal(t, len(pMsgs), len(ids))
|
|
|
|
groupName := "test"
|
|
for i := 0; i < 2; i++ {
|
|
consumer := &Consumer{
|
|
Topic: topicName,
|
|
GroupName: groupName + strconv.Itoa(i),
|
|
}
|
|
//make sure consumer not in rmq.consumersID
|
|
_ = rmq.DestroyConsumerGroup(topicName, groupName)
|
|
//add consumer to rmq.consumers
|
|
rmq.RegisterConsumer(consumer)
|
|
}
|
|
|
|
// update acked for all page in rmq but some consumer not in rmq.consumers
|
|
assert.Error(t, rmq.updateAckedInfo(topicName, groupName, 0, ids[len(ids)-1]))
|
|
}
|
|
|
|
func TestRocksmq_Info(t *testing.T) {
|
|
ep := etcdEndpoints()
|
|
etcdCli, err := etcd.GetRemoteEtcdClient(ep)
|
|
assert.Nil(t, err)
|
|
etcdKV := etcdkv.NewEtcdKV(etcdCli, "/etcd/test/root")
|
|
defer etcdKV.Close()
|
|
idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV)
|
|
_ = idAllocator.Initialize()
|
|
|
|
name := "/tmp/rocksmq_testinfo"
|
|
defer os.RemoveAll(name)
|
|
kvName := name + "_meta_kv"
|
|
_ = os.RemoveAll(kvName)
|
|
defer os.RemoveAll(kvName)
|
|
var params paramtable.BaseTable
|
|
params.Init()
|
|
atomic.StoreInt64(&RocksmqPageSize, 10)
|
|
rmq, err := NewRocksMQ(params, name, idAllocator)
|
|
assert.Nil(t, err)
|
|
defer rmq.Close()
|
|
|
|
topicName := "test_testinfo"
|
|
groupName := "test"
|
|
rmq.CreateTopic(topicName)
|
|
defer rmq.DestroyTopic(topicName)
|
|
|
|
consumer := &Consumer{
|
|
Topic: topicName,
|
|
GroupName: groupName,
|
|
}
|
|
|
|
_ = rmq.DestroyConsumerGroup(topicName, groupName)
|
|
err = rmq.CreateConsumerGroup(topicName, groupName)
|
|
assert.Nil(t, err)
|
|
|
|
err = rmq.RegisterConsumer(consumer)
|
|
assert.Nil(t, err)
|
|
|
|
assert.True(t, rmq.Info())
|
|
|
|
//test error
|
|
rmq.kv = &rocksdbkv.RocksdbKV{}
|
|
assert.False(t, rmq.Info())
|
|
}
|