mirror of https://github.com/milvus-io/milvus.git
parent
8b81ceb5d7
commit
78e8e4aa22
|
@ -11,21 +11,26 @@
|
|||
|
||||
package rocksmq
|
||||
|
||||
// ProducerMessage that will be write to rocksdb
|
||||
type ProducerMessage struct {
|
||||
Payload []byte
|
||||
}
|
||||
|
||||
// Rocksmq consumer
|
||||
type Consumer struct {
|
||||
Topic string
|
||||
GroupName string
|
||||
MsgMutex chan struct{}
|
||||
}
|
||||
|
||||
// ConsumerMessage that consumed from rocksdb
|
||||
type ConsumerMessage struct {
|
||||
MsgID UniqueID
|
||||
Payload []byte
|
||||
}
|
||||
|
||||
// Rocksmq is an interface thatmay be implemented by the application
|
||||
// to do message queue operations based ion rocksdb
|
||||
type RocksMQ interface {
|
||||
CreateTopic(topicName string) error
|
||||
DestroyTopic(topicName string) error
|
||||
|
|
|
@ -27,10 +27,16 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// RocksmqRetentionTimeInMinutes is the time of retention
|
||||
var RocksmqRetentionTimeInMinutes int64
|
||||
|
||||
// RocksmqRetentionSizeInMB is the size of retention
|
||||
var RocksmqRetentionSizeInMB int64
|
||||
|
||||
// TickerTimeInMinutes is the time of expired check
|
||||
var TickerTimeInMinutes int64 = 1
|
||||
|
||||
// Const value that used to convert unit
|
||||
const (
|
||||
MB = 2 << 20
|
||||
MINUTE = 60
|
||||
|
@ -477,6 +483,7 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error {
|
|||
return DeleteMessages(ri.db, topic, startID, endID)
|
||||
}
|
||||
|
||||
// Delte messages in rocksdb by range of [startID, endID)
|
||||
func DeleteMessages(db *gorocksdb.DB, topic string, startID, endID UniqueID) error {
|
||||
// Delete msg by range of startID and endID
|
||||
startKey, err := combKey(topic, startID)
|
||||
|
|
Loading…
Reference in New Issue