mirror of https://github.com/milvus-io/milvus.git
parent
b47ff1f97f
commit
f34e5205f7
|
@ -402,6 +402,7 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) error
|
|||
|
||||
/* Step I: Insert data to store system */
|
||||
batch := gorocksdb.NewWriteBatch()
|
||||
defer batch.Destroy()
|
||||
msgSizes := make(map[UniqueID]int64)
|
||||
msgIDs := make([]UniqueID, msgLen)
|
||||
for i := 0; i < msgLen && idStart+UniqueID(i) < idEnd; i++ {
|
||||
|
@ -416,8 +417,9 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) error
|
|||
msgSizes[msgID] = int64(len(messages[i].Payload))
|
||||
}
|
||||
|
||||
err = rmq.store.Write(gorocksdb.NewDefaultWriteOptions(), batch)
|
||||
batch.Destroy()
|
||||
opts := gorocksdb.NewDefaultWriteOptions()
|
||||
defer opts.Destroy()
|
||||
err = rmq.store.Write(opts, batch)
|
||||
if err != nil {
|
||||
log.Debug("RocksMQ: write batch failed")
|
||||
return err
|
||||
|
@ -624,8 +626,9 @@ func (rmq *rocksmq) Seek(topicName string, groupName string, msgID UniqueID) err
|
|||
return err
|
||||
}
|
||||
|
||||
val, err := rmq.store.Get(gorocksdb.NewDefaultReadOptions(), []byte(storeKey))
|
||||
defer val.Free()
|
||||
opts := gorocksdb.NewDefaultReadOptions()
|
||||
defer opts.Destroy()
|
||||
_, err = rmq.store.Get(opts, []byte(storeKey))
|
||||
if err != nil {
|
||||
log.Debug("RocksMQ: get " + storeKey + " failed")
|
||||
return err
|
||||
|
|
Loading…
Reference in New Issue