mirror of https://github.com/milvus-io/milvus.git
Deep copy rocksdb data to byte slice (#7605)
Add unittest Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>pull/7614/head
parent
0546437789
commit
89358ad91e
|
@ -541,8 +541,15 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum
|
|||
return nil, err
|
||||
}
|
||||
msg := ConsumerMessage{
|
||||
MsgID: msgID,
|
||||
Payload: val.Data(),
|
||||
MsgID: msgID,
|
||||
}
|
||||
origData := val.Data()
|
||||
dataLen := len(origData)
|
||||
if dataLen == 0 {
|
||||
msg.Payload = nil
|
||||
} else {
|
||||
msg.Payload = make([]byte, dataLen)
|
||||
copy(msg.Payload, origData)
|
||||
}
|
||||
consumerMessage = append(consumerMessage, msg)
|
||||
key.Free()
|
||||
|
|
|
@ -494,3 +494,67 @@ func TestRocksMQ_MultiChan(t *testing.T) {
|
|||
assert.Equal(t, len(cMsgs), 1)
|
||||
assert.Equal(t, string(cMsgs[0].Payload), "for_chann1_"+strconv.Itoa(0))
|
||||
}
|
||||
|
||||
func TestRocksMQ_CopyData(t *testing.T) {
|
||||
ep := etcdEndpoints()
|
||||
etcdKV, err := etcdkv.NewEtcdKV(ep, "/etcd/test/root")
|
||||
assert.Nil(t, err)
|
||||
defer etcdKV.Close()
|
||||
idAllocator := allocator.NewGlobalIDAllocator("dummy", etcdKV)
|
||||
_ = idAllocator.Initialize()
|
||||
|
||||
name := "/tmp/rocksmq_copydata"
|
||||
defer os.RemoveAll(name)
|
||||
kvName := name + "_meta_kv"
|
||||
_ = os.RemoveAll(kvName)
|
||||
defer os.RemoveAll(kvName)
|
||||
rmq, err := NewRocksMQ(name, idAllocator)
|
||||
assert.Nil(t, err)
|
||||
defer rmq.stopRetention()
|
||||
|
||||
channelName0 := "test_chan01"
|
||||
channelName1 := "test_chan11"
|
||||
err = rmq.CreateTopic(channelName0)
|
||||
assert.Nil(t, err)
|
||||
defer rmq.DestroyTopic(channelName0)
|
||||
err = rmq.CreateTopic(channelName1)
|
||||
assert.Nil(t, err)
|
||||
defer rmq.DestroyTopic(channelName1)
|
||||
assert.Nil(t, err)
|
||||
|
||||
msg0 := "abcde"
|
||||
pMsg0 := ProducerMessage{Payload: []byte(msg0)}
|
||||
err = rmq.Produce(channelName0, []ProducerMessage{pMsg0})
|
||||
assert.Nil(t, err)
|
||||
|
||||
pMsg1 := ProducerMessage{Payload: nil}
|
||||
err = rmq.Produce(channelName1, []ProducerMessage{pMsg1})
|
||||
assert.Nil(t, err)
|
||||
|
||||
pMsg2 := ProducerMessage{Payload: []byte{}}
|
||||
err = rmq.Produce(channelName1, []ProducerMessage{pMsg2})
|
||||
assert.Nil(t, err)
|
||||
|
||||
var emptyTargetData []byte
|
||||
pMsg3 := ProducerMessage{Payload: emptyTargetData}
|
||||
err = rmq.Produce(channelName1, []ProducerMessage{pMsg3})
|
||||
assert.Nil(t, err)
|
||||
|
||||
groupName := "test_group"
|
||||
_ = rmq.DestroyConsumerGroup(channelName0, groupName)
|
||||
err = rmq.CreateConsumerGroup(channelName0, groupName)
|
||||
assert.Nil(t, err)
|
||||
cMsgs0, err := rmq.Consume(channelName0, groupName, 1)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, len(cMsgs0), 1)
|
||||
assert.Equal(t, string(cMsgs0[0].Payload), msg0)
|
||||
|
||||
_ = rmq.DestroyConsumerGroup(channelName1, groupName)
|
||||
err = rmq.CreateConsumerGroup(channelName1, groupName)
|
||||
assert.Nil(t, err)
|
||||
cMsgs1, err := rmq.Consume(channelName1, groupName, 3)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, 3, len(cMsgs1))
|
||||
assert.Equal(t, emptyTargetData, cMsgs1[0].Payload)
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue