Fix bug and update doc

Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
pull/4973/head^2
zhenshan.cao 2020-10-26 17:27:53 +08:00 committed by yefu.chen
parent eb64839aef
commit 848e375c0d
1 changed files with 11 additions and 8 deletions

View File

@ -442,17 +442,17 @@ type TsMsg interface {
Ts() Timestamp
}
type TsMsgMarshaler interface {
Marshal(input *TsMsg) ([]byte, Status)
Unmarshal(input []byte) (*TsMsg, Status)
}
type MsgPack struct {
BeginTs Timestamp
EndTs Timestamp
Msgs []*TsMsg
}
type TsMsgMarshaler interface {
Marshal(input *TsMsg) ([]byte, Status)
Unmarshal(input []byte) (*TsMsg, Status)
}
type MsgStream interface {
SetMsgMarshaler(marshal *TsMsgMarshaler, unmarshal *TsMsgMarshaler)
Produce(*MsgPack) Status
@ -461,14 +461,17 @@ type MsgStream interface {
type PulsarMsgStream struct {
client *pulsar.Client
produceChannels []string
consumeChannels []string
msgHashFunc (*MsgPack) map[int32]*MsgPack // return a map from produceChannel idx to *MsgPack
producers []*pulsar.Producer
consumers []*pulsar.Consumer
msgMarshaler *TsMsgMarshaler
msgUnmarshaler *TsMsgMarshaler
}
func (ms *PulsarMsgStream) SetProducerChannels(channels []string)
func (ms *PulsarMsgStream) SetConsumerChannels(channels []string)
func (ms *PulsarMsgStream) SetMsgMarshaler(marshal *TsMsgMarshaler, unmarshal *TsMsgMarshaler)
func (ms *PulsarMsgStream) SetMsgHashFunc(XXX)
func (ms *PulsarMsgStream) Produce(*MsgPack) Status
func (ms *PulsarMsgStream) Consume() *MsgPack //return messages in one time tick