mirror of https://github.com/milvus-io/milvus.git
Add comments for exposed structures and functions (#8435)
Signed-off-by: Xiangyu Wang <xiangyu.wang@zilliz.com>pull/8436/head
parent
93c945d076
commit
eb413f1396
|
@ -22,6 +22,7 @@ import (
|
|||
rocksmqserver "github.com/milvus-io/milvus/internal/util/rocksmq/server/rocksmq"
|
||||
)
|
||||
|
||||
// PmsFactory is a pulsar msgstream factory that implemented Factory interface(msgstream.go)
|
||||
type PmsFactory struct {
|
||||
dispatcherFactory ProtoUDFactory
|
||||
// the following members must be public, so that mapstructure.Decode() can access them
|
||||
|
@ -30,6 +31,7 @@ type PmsFactory struct {
|
|||
PulsarBufSize int64
|
||||
}
|
||||
|
||||
// SetParams is used to set parameters for PmsFactory
|
||||
func (f *PmsFactory) SetParams(params map[string]interface{}) error {
|
||||
err := mapstructure.Decode(params, f)
|
||||
if err != nil {
|
||||
|
@ -38,6 +40,7 @@ func (f *PmsFactory) SetParams(params map[string]interface{}) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// NewMsgStream is used to generate a new Msgstream object
|
||||
func (f *PmsFactory) NewMsgStream(ctx context.Context) (MsgStream, error) {
|
||||
pulsarClient, err := mqclient.GetPulsarClientInstance(pulsar.ClientOptions{URL: f.PulsarAddress})
|
||||
if err != nil {
|
||||
|
@ -46,6 +49,7 @@ func (f *PmsFactory) NewMsgStream(ctx context.Context) (MsgStream, error) {
|
|||
return NewMqMsgStream(ctx, f.ReceiveBufSize, f.PulsarBufSize, pulsarClient, f.dispatcherFactory.NewUnmarshalDispatcher())
|
||||
}
|
||||
|
||||
// NewTtMsgStream is used to generate a new TtMsgstream object
|
||||
func (f *PmsFactory) NewTtMsgStream(ctx context.Context) (MsgStream, error) {
|
||||
pulsarClient, err := mqclient.GetPulsarClientInstance(pulsar.ClientOptions{URL: f.PulsarAddress})
|
||||
if err != nil {
|
||||
|
@ -54,10 +58,12 @@ func (f *PmsFactory) NewTtMsgStream(ctx context.Context) (MsgStream, error) {
|
|||
return NewMqTtMsgStream(ctx, f.ReceiveBufSize, f.PulsarBufSize, pulsarClient, f.dispatcherFactory.NewUnmarshalDispatcher())
|
||||
}
|
||||
|
||||
// NewQueryMsgStream is used to generate a new QueryMsgstream object
|
||||
func (f *PmsFactory) NewQueryMsgStream(ctx context.Context) (MsgStream, error) {
|
||||
return f.NewMsgStream(ctx)
|
||||
}
|
||||
|
||||
// NewPmsFactory is used to generate a new PmsFactory object
|
||||
func NewPmsFactory() Factory {
|
||||
f := &PmsFactory{
|
||||
dispatcherFactory: ProtoUDFactory{},
|
||||
|
@ -67,6 +73,7 @@ func NewPmsFactory() Factory {
|
|||
return f
|
||||
}
|
||||
|
||||
// RmsFactory is a rocksmq msgstream factory that implemented Factory interface(msgstream.go)
|
||||
type RmsFactory struct {
|
||||
dispatcherFactory ProtoUDFactory
|
||||
// the following members must be public, so that mapstructure.Decode() can access them
|
||||
|
@ -74,6 +81,7 @@ type RmsFactory struct {
|
|||
RmqBufSize int64
|
||||
}
|
||||
|
||||
// SetParams is used to set parameters for RmsFactory
|
||||
func (f *RmsFactory) SetParams(params map[string]interface{}) error {
|
||||
err := mapstructure.Decode(params, f)
|
||||
if err != nil {
|
||||
|
@ -82,6 +90,7 @@ func (f *RmsFactory) SetParams(params map[string]interface{}) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// NewMsgStream is used to generate a new Msgstream object
|
||||
func (f *RmsFactory) NewMsgStream(ctx context.Context) (MsgStream, error) {
|
||||
rmqClient, err := mqclient.NewRmqClient(rocksmq.ClientOptions{Server: rocksmqserver.Rmq})
|
||||
if err != nil {
|
||||
|
@ -90,6 +99,7 @@ func (f *RmsFactory) NewMsgStream(ctx context.Context) (MsgStream, error) {
|
|||
return NewMqMsgStream(ctx, f.ReceiveBufSize, f.RmqBufSize, rmqClient, f.dispatcherFactory.NewUnmarshalDispatcher())
|
||||
}
|
||||
|
||||
// NewTtMsgStream is used to generate a new TtMsgstream object
|
||||
func (f *RmsFactory) NewTtMsgStream(ctx context.Context) (MsgStream, error) {
|
||||
rmqClient, err := mqclient.NewRmqClient(rocksmq.ClientOptions{Server: rocksmqserver.Rmq})
|
||||
if err != nil {
|
||||
|
@ -98,6 +108,7 @@ func (f *RmsFactory) NewTtMsgStream(ctx context.Context) (MsgStream, error) {
|
|||
return NewMqTtMsgStream(ctx, f.ReceiveBufSize, f.RmqBufSize, rmqClient, f.dispatcherFactory.NewUnmarshalDispatcher())
|
||||
}
|
||||
|
||||
// NewQueryMsgStream is used to generate a new QueryMsgstream object
|
||||
func (f *RmsFactory) NewQueryMsgStream(ctx context.Context) (MsgStream, error) {
|
||||
rmqClient, err := mqclient.NewRmqClient(rocksmq.ClientOptions{Server: rocksmqserver.Rmq})
|
||||
if err != nil {
|
||||
|
@ -106,6 +117,7 @@ func (f *RmsFactory) NewQueryMsgStream(ctx context.Context) (MsgStream, error) {
|
|||
return NewMqMsgStream(ctx, f.ReceiveBufSize, f.RmqBufSize, rmqClient, f.dispatcherFactory.NewUnmarshalDispatcher())
|
||||
}
|
||||
|
||||
// NewRmsFactory is used to generate a new RmsFactory object
|
||||
func NewRmsFactory() Factory {
|
||||
f := &RmsFactory{
|
||||
dispatcherFactory: ProtoUDFactory{},
|
||||
|
|
|
@ -49,6 +49,7 @@ type mqMsgStream struct {
|
|||
consumerLock *sync.Mutex
|
||||
}
|
||||
|
||||
// NewMqMsgStream is used to generate a new mqMsgStream object
|
||||
func NewMqMsgStream(ctx context.Context,
|
||||
receiveBufSize int64,
|
||||
bufSize int64,
|
||||
|
@ -417,6 +418,7 @@ func (ms *mqMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// MqTtMsgStream is a msgstream that contains timeticks
|
||||
type MqTtMsgStream struct {
|
||||
mqMsgStream
|
||||
chanMsgBuf map[mqclient.Consumer][]TsMsg
|
||||
|
@ -430,6 +432,7 @@ type MqTtMsgStream struct {
|
|||
syncConsumer chan int
|
||||
}
|
||||
|
||||
// NewMqTtMsgStream is used to generate a new MqTtMsgStream object
|
||||
func NewMqTtMsgStream(ctx context.Context,
|
||||
receiveBufSize int64,
|
||||
bufSize int64,
|
||||
|
@ -509,6 +512,7 @@ func (ms *MqTtMsgStream) AsConsumer(channels []string, subName string) {
|
|||
}
|
||||
}
|
||||
|
||||
// Start will start a goroutine which keep carrying msg from pulsar/rocksmq to golang chan
|
||||
func (ms *MqTtMsgStream) Start() {
|
||||
if ms.consumers != nil {
|
||||
ms.wait.Add(1)
|
||||
|
@ -516,6 +520,7 @@ func (ms *MqTtMsgStream) Start() {
|
|||
}
|
||||
}
|
||||
|
||||
// Close will stop goroutine and free internal producers and consumers
|
||||
func (ms *MqTtMsgStream) Close() {
|
||||
ms.streamCancel()
|
||||
close(ms.syncConsumer)
|
||||
|
|
|
@ -23,6 +23,7 @@ type Timestamp = typeutil.Timestamp
|
|||
type IntPrimaryKey = typeutil.IntPrimaryKey
|
||||
type MsgPosition = internalpb.MsgPosition
|
||||
|
||||
// MsgPack represents a batch of msg in msgstream
|
||||
type MsgPack struct {
|
||||
BeginTs Timestamp
|
||||
EndTs Timestamp
|
||||
|
@ -33,6 +34,7 @@ type MsgPack struct {
|
|||
|
||||
type RepackFunc func(msgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error)
|
||||
|
||||
// MsgStream is an interface that can be used to produce and consume message on message queue
|
||||
type MsgStream interface {
|
||||
Start()
|
||||
Close()
|
||||
|
@ -48,6 +50,7 @@ type MsgStream interface {
|
|||
Seek(offset []*MsgPosition) error
|
||||
}
|
||||
|
||||
// Factory is an interface that can be used to generate a new msgstream object
|
||||
type Factory interface {
|
||||
SetParams(params map[string]interface{}) error
|
||||
NewMsgStream(ctx context.Context) (MsgStream, error)
|
||||
|
|
Loading…
Reference in New Issue