mirror of https://github.com/milvus-io/milvus.git
parent
40cb7e626b
commit
c4c89a639a
|
@ -24,7 +24,7 @@ import (
|
|||
rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb"
|
||||
)
|
||||
|
||||
// Global rocksmq instance that will be initialized only once
|
||||
// Rmq is global rocksmq instance that will be initialized only once
|
||||
var Rmq *rocksmq
|
||||
|
||||
// once is used to init global rocksmq
|
||||
|
|
|
@ -16,7 +16,7 @@ type ProducerMessage struct {
|
|||
Payload []byte
|
||||
}
|
||||
|
||||
// Rocksmq consumer
|
||||
// Consumer is rocksmq consumer
|
||||
type Consumer struct {
|
||||
Topic string
|
||||
GroupName string
|
||||
|
@ -29,7 +29,7 @@ type ConsumerMessage struct {
|
|||
Payload []byte
|
||||
}
|
||||
|
||||
// Rocksmq is an interface thatmay be implemented by the application
|
||||
// RocksMQ is an interface thatmay be implemented by the application
|
||||
// to do message queue operations based ion rocksdb
|
||||
type RocksMQ interface {
|
||||
CreateTopic(topicName string) error
|
||||
|
|
|
@ -112,6 +112,7 @@ type rocksmq struct {
|
|||
retentionInfo *retentionInfo
|
||||
}
|
||||
|
||||
// NewRocksMQ step:
|
||||
// 1. New rocksmq instance based on rocksdb with name and rocksdbkv with kvname
|
||||
// 2. Init retention info, load retention info to memory
|
||||
// 3. Start retention goroutine
|
||||
|
|
|
@ -279,6 +279,7 @@ func (ri *retentionInfo) retention() error {
|
|||
for {
|
||||
select {
|
||||
case <-ri.ctx.Done():
|
||||
log.Debug("aaaaaaaaaaaaa")
|
||||
return nil
|
||||
case t := <-ticker.C:
|
||||
timeNow := t.Unix()
|
||||
|
@ -483,7 +484,7 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error {
|
|||
return DeleteMessages(ri.db, topic, startID, endID)
|
||||
}
|
||||
|
||||
// Delte messages in rocksdb by range of [startID, endID)
|
||||
// DeleteMessages in rocksdb by range of [startID, endID)
|
||||
func DeleteMessages(db *gorocksdb.DB, topic string, startID, endID UniqueID) error {
|
||||
// Delete msg by range of startID and endID
|
||||
startKey, err := combKey(topic, startID)
|
||||
|
|
Loading…
Reference in New Issue