[skip ci]Fix golint warnings in flowgraph (#9234)

Signed-off-by: Xiangyu Wang <xiangyu.wang@zilliz.com>
pull/9236/head
Xiangyu Wang 2021-10-04 22:38:02 +08:00 committed by GitHub
parent 4b62f177ee
commit a5daaf0755
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 17 additions and 2 deletions

View File

@ -32,6 +32,7 @@ func (inNode *InputNode) IsInputNode() bool {
return true
}
// Start is used to start input msgstream
func (inNode *InputNode) Start() {
inNode.inStream.Start()
}
@ -54,7 +55,7 @@ func (inNode *InputNode) InStream() msgstream.MsgStream {
return inNode.inStream
}
// empty input and return one *Msg
// Operate consume a message pack from msgstream and return
func (inNode *InputNode) Operate(in []Msg) []Msg {
msgPack := inNode.inStream.Consume()

View File

@ -15,10 +15,12 @@ import (
"github.com/milvus-io/milvus/internal/msgstream"
)
// Msg is an abstract class that contains a method to get the time tick of this message
type Msg interface {
TimeTick() Timestamp
}
// MsgStreamMsg is a wrapper of TsMsg in flowgraph
type MsgStreamMsg struct {
tsMessages []msgstream.TsMsg
timestampMin Timestamp
@ -27,6 +29,7 @@ type MsgStreamMsg struct {
endPositions []*MsgPosition
}
// GenerateMsgStreamMsg is used to create a new MsgStreamMsg object
func GenerateMsgStreamMsg(tsMessages []msgstream.TsMsg, timestampMin, timestampMax Timestamp, startPos []*MsgPosition, endPos []*MsgPosition) *MsgStreamMsg {
return &MsgStreamMsg{
tsMessages: tsMessages,
@ -37,30 +40,37 @@ func GenerateMsgStreamMsg(tsMessages []msgstream.TsMsg, timestampMin, timestampM
}
}
// TimeTick returns the timetick of this message
func (msMsg *MsgStreamMsg) TimeTick() Timestamp {
return msMsg.timestampMax
}
// DownStreamNodeIdx returns 0
func (msMsg *MsgStreamMsg) DownStreamNodeIdx() int {
return 0
}
// TsMessages returns the origin TsMsg object list
func (msMsg *MsgStreamMsg) TsMessages() []msgstream.TsMsg {
return msMsg.tsMessages
}
// TimestampMin returns the minimal timestamp in the TsMsg list
func (msMsg *MsgStreamMsg) TimestampMin() Timestamp {
return msMsg.timestampMin
}
// TimestampMax returns the maximal timestamp in the TsMsg list
func (msMsg *MsgStreamMsg) TimestampMax() Timestamp {
return msMsg.timestampMax
}
// StartPositions returns the start position of TsMsgs
func (msMsg *MsgStreamMsg) StartPositions() []*MsgPosition {
return msMsg.startPositions
}
// EndPositions returns the end position of TsMsgs
func (msMsg *MsgStreamMsg) EndPositions() []*MsgPosition {
return msMsg.endPositions
}

View File

@ -183,18 +183,22 @@ func (nodeCtx *nodeCtx) collectInputMessages() {
}
}
// MaxQueueLength returns the maximal queue length
func (node *BaseNode) MaxQueueLength() int32 {
return node.maxQueueLength
}
// MaxParallelism returns the maximal parallelism
func (node *BaseNode) MaxParallelism() int32 {
return node.maxParallelism
}
// SetMaxQueueLength is used to set the maximal queue length
func (node *BaseNode) SetMaxQueueLength(n int32) {
node.maxQueueLength = n
}
// SetMaxParallelism is used to set the maximal parallelism
func (node *BaseNode) SetMaxParallelism(n int32) {
node.maxParallelism = n
}
@ -207,5 +211,5 @@ func (node *BaseNode) IsInputNode() bool {
// Start implementing Node, base node does nothing when starts
func (node *BaseNode) Start() {}
// Stop, implementing Node, base node does nothing when stops
// Close implementing Node, base node does nothing when stops
func (node *BaseNode) Close() {}