Improve rocksmq client code coverage (#7540)

Signed-off-by: fishpenguin <kun.yu@zilliz.com>
pull/7557/merge
yukun 2021-09-08 11:03:59 +08:00 committed by GitHub
parent 4785e452be
commit fbc352263c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 212 additions and 6 deletions

View File

@ -18,7 +18,9 @@ import (
type RocksMQ = server.RocksMQ
func NewClient(options ClientOptions) (Client, error) {
options.Server = server.Rmq
if options.Server == nil {
options.Server = server.Rmq
}
return newClient(options)
}

View File

@ -23,11 +23,18 @@ func TestClient(t *testing.T) {
assert.Nil(t, err)
}
func TestCreateProducer(t *testing.T) {
func TestClient_CreateProducer(t *testing.T) {
var client0 client
producer0, err := client0.CreateProducer(ProducerOptions{})
assert.Nil(t, producer0)
assert.Error(t, err)
/////////////////////////////////////////////////
client, err := NewClient(ClientOptions{
Server: newMockRocksMQ(),
})
assert.NoError(t, err)
defer client.Close()
producer, err := client.CreateProducer(ProducerOptions{
Topic: newTopicName(),
@ -35,10 +42,32 @@ func TestCreateProducer(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, producer)
client.Close()
/////////////////////////////////////////////////
rmqPath := "/tmp/milvus/test_client1"
rmq := newRocksMQ(rmqPath)
defer removePath(rmqPath)
client1, err := NewClient(ClientOptions{
Server: rmq,
})
assert.NoError(t, err)
defer client1.Close()
producer1, err := client1.CreateProducer(ProducerOptions{
Topic: newTopicName(),
})
assert.NotNil(t, producer1)
assert.NoError(t, err)
defer producer1.Close()
// /////////////////////////////////////////////////
// dummyTopic := strings.Repeat(newTopicName(), 100)
// producer2, err := client1.CreateProducer(ProducerOptions{
// Topic: dummyTopic,
// })
// assert.Nil(t, producer2)
// assert.Error(t, err)
}
func TestSubscribe(t *testing.T) {
func TestClient_Subscribe(t *testing.T) {
client, err := NewClient(ClientOptions{
Server: newMockRocksMQ(),
})
@ -52,4 +81,63 @@ func TestSubscribe(t *testing.T) {
assert.Nil(t, consumer)
client.Close()
/////////////////////////////////////////////////
rmqPath := "/tmp/milvus/test_client2"
rmq := newRocksMQ(rmqPath)
defer removePath(rmqPath)
client1, err := NewClient(ClientOptions{
Server: rmq,
})
assert.NoError(t, err)
defer client1.Close()
opt := ConsumerOptions{
Topic: newTopicName(),
SubscriptionName: newConsumerName(),
}
consumer1, err := client1.Subscribe(opt)
assert.NoError(t, err)
assert.NotNil(t, consumer1)
consumer2, err := client1.Subscribe(opt)
assert.NoError(t, err)
assert.NotNil(t, consumer2)
producer1, err := client1.CreateProducer(ProducerOptions{
Topic: newTopicName(),
})
assert.NotNil(t, producer1)
assert.NoError(t, err)
}
func TestClient_consume(t *testing.T) {
rmqPath := "/tmp/milvus/test_client3"
rmq := newRocksMQ(rmqPath)
defer removePath(rmqPath)
client, err := NewClient(ClientOptions{
Server: rmq,
})
assert.NoError(t, err)
defer client.Close()
topicName := newTopicName()
producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
})
assert.NotNil(t, producer)
assert.NoError(t, err)
opt := ConsumerOptions{
Topic: topicName,
SubscriptionName: newConsumerName(),
}
consumer, err := client.Subscribe(opt)
assert.NoError(t, err)
assert.NotNil(t, consumer)
msg := &ProducerMessage{
Payload: make([]byte, 10),
}
producer.Send(msg)
<-consumer.Chan()
}

View File

@ -17,7 +17,9 @@ import (
"github.com/stretchr/testify/assert"
)
func TestConsumer(t *testing.T) {
func TestConsumer_newConsumer(t *testing.T) {
assert.Equal(t, EarliestMessageID(), int64(-1))
consumer, err := newConsumer(nil, ConsumerOptions{
Topic: newTopicName(),
SubscriptionName: newConsumerName(),
@ -32,15 +34,73 @@ func TestConsumer(t *testing.T) {
assert.NotNil(t, err)
assert.Equal(t, InvalidConfiguration, err.(*Error).Result())
consumer, err = newConsumer1(newMockClient(), ConsumerOptions{}, nil)
assert.Nil(t, consumer)
assert.NotNil(t, err)
assert.Equal(t, InvalidConfiguration, err.(*Error).Result())
consumer, err = newConsumer(newMockClient(), ConsumerOptions{
Topic: newTopicName(),
})
assert.Nil(t, consumer)
assert.NotNil(t, err)
assert.Equal(t, InvalidConfiguration, err.(*Error).Result())
/////////////////////////////////////////////////
rmqPath := "/tmp/milvus/test_consumer1"
rmq := newRocksMQ(rmqPath)
defer removePath(rmqPath)
client, err := newClient(ClientOptions{
Server: rmq,
})
assert.NoError(t, err)
assert.NotNil(t, client)
defer client.Close()
consumerName := newConsumerName()
consumer1, err := newConsumer(client, ConsumerOptions{
Topic: newTopicName(),
SubscriptionName: consumerName,
})
assert.NoError(t, err)
assert.NotNil(t, consumer1)
defer consumer1.Close()
assert.Equal(t, consumerName, consumer1.Subscription())
consumer2, err := newConsumer(client, ConsumerOptions{
Topic: "",
})
assert.Error(t, err)
assert.Nil(t, consumer2)
consumer3, err := newConsumer(client, ConsumerOptions{
Topic: newTopicName(),
SubscriptionName: "",
})
assert.Error(t, err)
assert.Nil(t, consumer3)
consumer4, err := newConsumer1(client, ConsumerOptions{
Topic: newTopicName(),
SubscriptionName: newConsumerName(),
}, nil)
assert.NoError(t, err)
assert.NotNil(t, consumer4)
consumer5, err := newConsumer1(client, ConsumerOptions{
Topic: "",
}, nil)
assert.Error(t, err)
assert.Nil(t, consumer5)
consumer6, err := newConsumer1(client, ConsumerOptions{
Topic: newTopicName(),
SubscriptionName: "",
}, nil)
assert.Error(t, err)
assert.Nil(t, consumer6)
}
func TestSubscription(t *testing.T) {
func TestConsumer_Subscription(t *testing.T) {
topicName := newTopicName()
consumerName := newConsumerName()
consumer, err := newConsumer(newMockClient(), ConsumerOptions{
@ -51,3 +111,26 @@ func TestSubscription(t *testing.T) {
assert.NotNil(t, err)
//assert.Equal(t, consumerName, consumer.Subscription())
}
func TestConsumer_Seek(t *testing.T) {
rmqPath := "/tmp/milvus/test_consumer2"
rmq := newRocksMQ(rmqPath)
defer removePath(rmqPath)
client, err := newClient(ClientOptions{
Server: rmq,
})
assert.NoError(t, err)
assert.NotNil(t, client)
defer client.Close()
topicName := newTopicName()
consumerName := newConsumerName()
consumer, err := newConsumer(client, ConsumerOptions{
Topic: topicName,
SubscriptionName: consumerName,
})
assert.NoError(t, err)
assert.NotNil(t, consumer)
consumer.Seek(0)
}

View File

@ -13,8 +13,12 @@ package rocksmq
import (
"fmt"
"os"
"time"
"github.com/milvus-io/milvus/internal/allocator"
rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb"
rocksmq "github.com/milvus-io/milvus/internal/util/rocksmq/server/rocksmq"
server "github.com/milvus-io/milvus/internal/util/rocksmq/server/rocksmq"
)
@ -37,3 +41,32 @@ func newMockClient() *client {
})
return client
}
func initIDAllocator(kvPath string) *allocator.GlobalIDAllocator {
rocksdbKV, err := rocksdbkv.NewRocksdbKV(kvPath)
if err != nil {
panic(err)
}
idAllocator := allocator.NewGlobalIDAllocator("rmq_id", rocksdbKV)
_ = idAllocator.Initialize()
return idAllocator
}
func newRocksMQ(rmqPath string) server.RocksMQ {
kvPath := rmqPath + "_kv"
idAllocator := initIDAllocator(kvPath)
rocksdbPath := rmqPath + "_db"
rmq, _ := rocksmq.NewRocksMQ(rocksdbPath, idAllocator)
return rmq
}
func removePath(rmqPath string) {
kvPath := rmqPath + "_kv"
os.RemoveAll(kvPath)
rocksdbPath := rmqPath + "_db"
os.RemoveAll(rocksdbPath)
metaPath := rmqPath + "_meta_kv"
os.RemoveAll(metaPath)
}