mirror of https://github.com/milvus-io/milvus.git
131 lines
2.5 KiB
Go
131 lines
2.5 KiB
Go
package rocksmq
|
|
|
|
import (
|
|
"sync"
|
|
|
|
"github.com/zilliztech/milvus-distributed/internal/errors"
|
|
"github.com/zilliztech/milvus-distributed/internal/kv"
|
|
memkv "github.com/zilliztech/milvus-distributed/internal/kv/mem"
|
|
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
|
|
)
|
|
|
|
type UniqueID = typeutil.UniqueID
|
|
|
|
type ProducerMessage struct {
|
|
payload []byte
|
|
}
|
|
|
|
type ConsumerMessage struct {
|
|
msgID UniqueID
|
|
payload []byte
|
|
}
|
|
|
|
type Channel struct {
|
|
beginOffset UniqueID
|
|
endOffset UniqueID
|
|
}
|
|
|
|
type ConsumerGroupContext struct {
|
|
currentOffset UniqueID
|
|
}
|
|
|
|
type RocksMQ struct {
|
|
kv kv.Base
|
|
channels map[string]*Channel
|
|
cgCtxs map[string]ConsumerGroupContext
|
|
mu sync.Mutex
|
|
}
|
|
|
|
func NewRocksMQ() *RocksMQ {
|
|
mkv := memkv.NewMemoryKV()
|
|
rmq := &RocksMQ{
|
|
kv: mkv,
|
|
}
|
|
return rmq
|
|
}
|
|
|
|
func (rmq *RocksMQ) checkKeyExist(key string) bool {
|
|
_, err := rmq.kv.Load(key)
|
|
return err == nil
|
|
}
|
|
|
|
func (rmq *RocksMQ) CreateChannel(channelName string) error {
|
|
beginKey := channelName + "/begin_id"
|
|
endKey := channelName + "/end_id"
|
|
|
|
// Check if channel exist
|
|
if rmq.checkKeyExist(beginKey) || rmq.checkKeyExist(endKey) {
|
|
return errors.New("Channel " + channelName + " already exists.")
|
|
}
|
|
|
|
err := rmq.kv.Save(beginKey, "0")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = rmq.kv.Save(endKey, "0")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
channel := &Channel{
|
|
beginOffset: 0,
|
|
endOffset: 0,
|
|
}
|
|
rmq.channels[channelName] = channel
|
|
return nil
|
|
}
|
|
|
|
func (rmq *RocksMQ) DestroyChannel(channelName string) error {
|
|
beginKey := channelName + "/begin_id"
|
|
endKey := channelName + "/end_id"
|
|
|
|
err := rmq.kv.Remove(beginKey)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = rmq.kv.Remove(endKey)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (rmq *RocksMQ) CreateConsumerGroup(groupName string, channelName string) error {
|
|
key := groupName + "/" + channelName + "/current_id"
|
|
if rmq.checkKeyExist(key) {
|
|
return errors.New("ConsumerGroup " + groupName + " already exists.")
|
|
}
|
|
err := rmq.kv.Save(key, "0")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (rmq *RocksMQ) DestroyConsumerGroup(groupName string, channelName string) error {
|
|
key := groupName + "/" + channelName + "/current_id"
|
|
|
|
err := rmq.kv.Remove(key)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (rmq *RocksMQ) Produce(channelName string, messages []ProducerMessage) error {
|
|
return nil
|
|
}
|
|
|
|
func (rmq *RocksMQ) Consume(groupName string, channelName string, n int) ([]ConsumerMessage, error) {
|
|
return nil, nil
|
|
}
|
|
|
|
func (rmq *RocksMQ) Seek(groupName string, channelName string, msgID UniqueID) error {
|
|
return nil
|
|
}
|