milvus/docs/developer_guides/chap04_message_stream.md

7.6 KiB

8. Message Stream

8.2 Message Stream Service API

type Client interface {
  CreateChannels(req CreateChannelRequest) (CreateChannelResponse, error)
  DestoryChannels(req DestoryChannelRequest) error
  DescribeChannels(req DescribeChannelRequest) (DescribeChannelResponse, error)
}
  • CreateChannels
type OwnerDescription struct {
  Role string
  Address string
  //Token string
  DescriptionText string
}

type CreateChannelRequest struct {
  OwnerDescription OwnerDescription
  NumChannels int
}

type CreateChannelResponse struct {
  ChannelNames []string
}
  • DestoryChannels
type DestoryChannelRequest struct {
	ChannelNames []string
}
  • DescribeChannels
type DescribeChannelRequest struct {
	ChannelNames []string
}

type ChannelDescription struct {
  ChannelName string
  Owner OwnerDescription
}

type DescribeChannelResponse struct {
  Descriptions []ChannelDescription
}

A.3 Message Stream

  • Overview
  • Interface
// Msg

type MsgType uint32
const {
  kInsert MsgType = 400
  kDelete MsgType = 401
  kSearch MsgType = 500
  kSearchResult MsgType = 1000
  
  kSegStatistics MsgType = 1100
  
  kTimeTick MsgType = 1200
  kTimeSync MsgType = 1201
}

type TsMsg interface {
    SetTs(ts Timestamp)
    BeginTs() Timestamp
    EndTs() Timestamp
    Type() MsgType
    Marshal(*TsMsg) interface{}
    Unmarshal(interface{}) *TsMsg
}

type MsgPosition {
  ChannelName string
  MsgID string
  TimestampFilter Timestamp
}

type MsgPack struct {
  BeginTs Timestamp
  EndTs Timestamp
  Msgs []TsMsg
  StartPositions []MsgPosition
  EndPositions []MsgPosition
}

type RepackFunc(msgs []* TsMsg, hashKeys [][]int32) map[int32] *MsgPack
// Unmarshal

// Interface
type UnmarshalFunc func(interface{}) *TsMsg

type UnmarshalDispatcher interface {
    Unmarshal(interface{}, msgType commonpb.MsgType) (msgstream.TsMsg, error)
}

type UnmarshalDispatcherFactory interface {
    NewUnmarshalDispatcher() *UnmarshalDispatcher
}

// Proto & Mem Implementation
type ProtoUDFactory struct {}
func (pudf *ProtoUDFactory) NewUnmarshalDispatcher() *UnmarshalDispatcher

type MemUDFactory struct {}
func (mudf *MemUDFactory) NewUnmarshalDispatcher() *UnmarshalDispatcher
// MsgStream

// Interface
type MsgStream interface {
    Start()
    Close()
    AsProducer(channels []string)
    AsConsumer(channels []string, subName string)
    Produce(*MsgPack) error
    Broadcast(*MsgPack) error
    Consume() *MsgPack // message can be consumed exactly once
    Seek(mp *MsgPosition) error
}

type MsgStreamFactory interface {
    NewMsgStream() *MsgStream
    NewTtMsgStream() *MsgStream
}

// Pulsar
type PulsarMsgStreamFactory interface {}
func (pmsf *PulsarMsgStreamFactory) NewMsgStream() *MsgStream
func (pmsf *PulsarMsgStreamFactory) NewTtMsgStream() *MsgStream

// RockMQ
type RmqMsgStreamFactory interface {}
func (rmsf *RmqMsgStreamFactory) NewMsgStream() *MsgStream
func (rmsf *RmqMsgStreamFactory) NewTtMsgStream() *MsgStream
// PulsarMsgStream

type PulsarMsgStream struct {
  client *pulsar.Client
  repackFunc RepackFunc
  producers []*pulsar.Producer
  consumers []*pulsar.Consumer
  unmarshal *UnmarshalDispatcher
}

func (ms *PulsarMsgStream) Start() error
func (ms *PulsarMsgStream) Close() error
func (ms *PulsarMsgStream) AsProducer(channels []string)
func (ms *PulsarMsgStream) AsConsumer(channels []string, subName string)
func (ms *PulsarMsgStream) Produce(msgs *MsgPack) error
func (ms *PulsarMsgStream) Broadcast(msgs *MsgPack) error
func (ms *PulsarMsgStream) Consume() (*MsgPack, error)
func (ms *PulsarMsgStream) Seek(mp *MsgPosition) error
func (ms *PulsarMsgStream) SetRepackFunc(repackFunc RepackFunc)

func NewPulsarMsgStream(ctx context.Context, pulsarAddr string, bufferSize int64) *PulsarMsgStream


type PulsarTtMsgStream struct {
  client *pulsar.Client
  repackFunc RepackFunc
  producers []*pulsar.Producer
  consumers []*pulsar.Consumer
  unmarshal *UnmarshalDispatcher
  inputBuf []*TsMsg
  unsolvedBuf []*TsMsg
  msgPacks []*MsgPack
}

func (ms *PulsarTtMsgStream) Start() error
func (ms *PulsarTtMsgStream) Close() error
func (ms *PulsarTtMsgStream) AsProducer(channels []string)
func (ms *PulsarTtMsgStream) AsConsumer(channels []string, subName string)
func (ms *PulsarTtMsgStream) Produce(msgs *MsgPack) error
func (ms *PulsarTtMsgStream) Broadcast(msgs *MsgPack) error
func (ms *PulsarTtMsgStream) Consume() *MsgPack //return messages in one time tick
func (ms *PulsarTtMsgStream) Seek(mp *MsgPosition) error
func (ms *PulsarTtMsgStream) SetRepackFunc(repackFunc RepackFunc)

func NewPulsarTtMsgStream(ctx context.Context, pulsarAddr string, bufferSize int64) *PulsarTtMsgStream

// RmqMsgStream

type RmqMsgStream struct {
    client *rockermq.RocksMQ
    repackFunc RepackFunc
    producers []string
    consumers []string
    subName string
    unmarshal *UnmarshalDispatcher
}

func (ms *RmqMsgStream) Start() error
func (ms *RmqMsgStream) Close() error
func (ms *RmqMsgStream) AsProducer(channels []string)
func (ms *RmqMsgStream) AsConsumer(channels []string, subName string)
func (ms *RmqMsgStream) Produce(msgs *MsgPack) error
func (ms *RmqMsgStream) Broadcast(msgs *MsgPack) error
func (ms *RmqMsgStream) Consume() (*MsgPack, error)
func (ms *RmqMsgStream) Seek(mp *MsgPosition) error
func (ms *RmqMsgStream) SetRepackFunc(repackFunc RepackFunc)

func NewRmqMsgStream(ctx context.Context) *RmqMsgStream

type RmqTtMsgStream struct {
    client *rockermq.RocksMQ
    repackFunc RepackFunc
    producers []string
    consumers []string
    subName string
    unmarshal *UnmarshalDispatcher
}

func (ms *RmqTtMsgStream) Start() error
func (ms *RmqTtMsgStream) Close() error
func (ms *RmqTtMsgStream) AsProducer(channels []string)
func (ms *RmqTtMsgStream) AsConsumer(channels []string, subName string)
func (ms *RmqTtMsgStream) Produce(msgs *MsgPack) error
func (ms *RmqTtMsgStream) Broadcast(msgs *MsgPack) error
func (ms *RmqTtMsgStream) Consume() (*MsgPack, error)
func (ms *RmqTtMsgStream) Seek(mp *MsgPosition) error
func (ms *RmqTtMsgStream) SetRepackFunc(repackFunc RepackFunc)

func NewRmqTtMsgStream(ctx context.Context) *RmqTtMsgStream

A.4 RocksMQ

RocksMQ is a RocksDB-based messaging/streaming library.

// All the following UniqueIDs are 64-bit integer, which is combined with timestamp and increasing number

type ProducerMessage struct {
  payload []byte
} 

type ConsumerMessage struct {
  msgID UniqueID
  payload []byte
} 

type IDAllocator interface {
	Alloc(count uint32) (UniqueID, UniqueID, error)
	AllocOne() (UniqueID, error)
	UpdateID() error
}

// Every collection has its RocksMQ
type RocksMQ struct {
    store       *gorocksdb.DB
	kv          kv.Base
	idAllocator IDAllocator
	produceMu   sync.Mutex
	consumeMu   sync.Mutex
}

func (rmq *RocksMQ) CreateChannel(channelName string) error
func (rmq *RocksMQ) DestroyChannel(channelName string) error
func (rmq *RocksMQ) CreateConsumerGroup(groupName string) error
func (rmq *RocksMQ) DestroyConsumerGroup(groupName string) error
func (rmq *RocksMQ) Produce(channelName string, messages []ProducerMessage) error
func (rmq *RocksMQ) Consume(groupName string, channelName string, n int) ([]ConsumerMessage, error)
func (rmq *RocksMQ) Seek(groupName string, channelName string, msgID MessageID) error

func NewRocksMQ(name string, idAllocator IDAllocator) (*RocksMQ, error)
A.4.1 Meta (stored in Etcd)
// channel meta
"$(channel_name)/begin_id", UniqueID
"$(channel_name)/end_id", UniqueID

// consumer group meta
"$(group_name)/$(channel_name)/current_id", UniqueID
A.4.2 Data (stored in RocksDB)
  • data
"$(channel_name)/$(unique_id)", []byte