Fix golint warnings in msg.go (#8590)

Signed-off-by: Xiangyu Wang <xiangyu.wang@zilliz.com>
pull/8598/head
Xiangyu Wang 2021-09-26 17:36:07 +08:00 committed by GitHub
parent 607e16f60f
commit cbe8c03224
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 158 additions and 23 deletions

View File

@ -236,7 +236,7 @@ func (ms *mqMsgStream) Produce(msgPack *MsgPack) error {
return err
}
m, err := ConvertToByteArray(mb)
m, err := convertToByteArray(mb)
if err != nil {
return err
}
@ -275,7 +275,7 @@ func (ms *mqMsgStream) Broadcast(msgPack *MsgPack) error {
return err
}
m, err := ConvertToByteArray(mb)
m, err := convertToByteArray(mb)
if err != nil {
return err
}

View File

@ -22,9 +22,13 @@ import (
"github.com/milvus-io/milvus/internal/proto/internalpb"
)
// MsgType is an alias ofo commonpb.MsgType
type MsgType = commonpb.MsgType
// MarshalType is an empty interface
type MarshalType = interface{}
// TsMsg provides methods to get begin timestamp and end timestamp of a message pack
type TsMsg interface {
TraceCtx() context.Context
SetTraceCtx(ctx context.Context)
@ -40,6 +44,7 @@ type TsMsg interface {
SetPosition(*MsgPosition)
}
// BaseMsg is a basic structure that contains begin timestamp, end timestamp and the position of msgstream
type BaseMsg struct {
Ctx context.Context
BeginTimestamp Timestamp
@ -48,35 +53,42 @@ type BaseMsg struct {
MsgPosition *MsgPosition
}
// TraceCtx returns the context of opentracing
func (bm *BaseMsg) TraceCtx() context.Context {
return bm.Ctx
}
// SetTraceCtx is used to set context for opentracing
func (bm *BaseMsg) SetTraceCtx(ctx context.Context) {
bm.Ctx = ctx
}
// BeginTs returns the begin timestamp of this message pack
func (bm *BaseMsg) BeginTs() Timestamp {
return bm.BeginTimestamp
}
// EndTs returns the end timestamp of this message pack
func (bm *BaseMsg) EndTs() Timestamp {
return bm.EndTimestamp
}
// HashKeys returns the end timestamp of this message pack
func (bm *BaseMsg) HashKeys() []uint32 {
return bm.HashValues
}
// Position returns the position of this message pack in msgstream
func (bm *BaseMsg) Position() *MsgPosition {
return bm.MsgPosition
}
// SetPosition is used to set position of this message in msgstream
func (bm *BaseMsg) SetPosition(position *MsgPosition) {
bm.MsgPosition = position
}
func ConvertToByteArray(input interface{}) ([]byte, error) {
func convertToByteArray(input interface{}) ([]byte, error) {
switch output := input.(type) {
case []byte:
return output, nil
@ -86,6 +98,8 @@ func ConvertToByteArray(input interface{}) ([]byte, error) {
}
/////////////////////////////////////////Insert//////////////////////////////////////////
// InsertMsg is a message pack that contains insert request
type InsertMsg struct {
BaseMsg
internalpb.InsertRequest
@ -94,18 +108,22 @@ type InsertMsg struct {
// interface implementation validation
var _ TsMsg = &InsertMsg{}
// ID returns the ID of this message pack
func (it *InsertMsg) ID() UniqueID {
return it.Base.MsgID
}
// Type returns the type of this message pack
func (it *InsertMsg) Type() MsgType {
return it.Base.MsgType
}
// SourceID indicated which component generated this message
func (it *InsertMsg) SourceID() int64 {
return it.Base.SourceID
}
// Marshal is used to serializing a message pack to byte array
func (it *InsertMsg) Marshal(input TsMsg) (MarshalType, error) {
insertMsg := input.(*InsertMsg)
insertRequest := &insertMsg.InsertRequest
@ -116,9 +134,10 @@ func (it *InsertMsg) Marshal(input TsMsg) (MarshalType, error) {
return mb, nil
}
// Unmarshal is used to deserializing a message pack from byte array
func (it *InsertMsg) Unmarshal(input MarshalType) (TsMsg, error) {
insertRequest := internalpb.InsertRequest{}
in, err := ConvertToByteArray(input)
in, err := convertToByteArray(input)
if err != nil {
return nil, err
}
@ -145,6 +164,8 @@ func (it *InsertMsg) Unmarshal(input MarshalType) (TsMsg, error) {
}
/////////////////////////////////////////Delete//////////////////////////////////////////
// DeleteMsg is a message pack that contains delete request
type DeleteMsg struct {
BaseMsg
internalpb.DeleteRequest
@ -153,18 +174,22 @@ type DeleteMsg struct {
// interface implementation validation
var _ TsMsg = &DeleteMsg{}
// ID returns the ID of this message pack
func (dt *DeleteMsg) ID() UniqueID {
return dt.Base.MsgID
}
// Type returns the type of this message pack
func (dt *DeleteMsg) Type() MsgType {
return dt.Base.MsgType
}
// SourceID indicated which component generated this message
func (dt *DeleteMsg) SourceID() int64 {
return dt.Base.SourceID
}
// Marshal is used to serializing a message pack to byte array
func (dt *DeleteMsg) Marshal(input TsMsg) (MarshalType, error) {
deleteMsg := input.(*DeleteMsg)
deleteRequest := &deleteMsg.DeleteRequest
@ -176,9 +201,10 @@ func (dt *DeleteMsg) Marshal(input TsMsg) (MarshalType, error) {
return mb, nil
}
// Unmarshal is used to deserializing a message pack from byte array
func (dt *DeleteMsg) Unmarshal(input MarshalType) (TsMsg, error) {
deleteRequest := internalpb.DeleteRequest{}
in, err := ConvertToByteArray(input)
in, err := convertToByteArray(input)
if err != nil {
return nil, err
}
@ -194,6 +220,8 @@ func (dt *DeleteMsg) Unmarshal(input MarshalType) (TsMsg, error) {
}
/////////////////////////////////////////Search//////////////////////////////////////////
// SearchMsg is a message pack that contains search request
type SearchMsg struct {
BaseMsg
internalpb.SearchRequest
@ -202,26 +230,34 @@ type SearchMsg struct {
// interface implementation validation
var _ TsMsg = &SearchMsg{}
// ID returns the ID of this message pack
func (st *SearchMsg) ID() UniqueID {
return st.Base.MsgID
}
// Type returns the type of this message pack
func (st *SearchMsg) Type() MsgType {
return st.Base.MsgType
}
// SourceID indicated which component generated this message
func (st *SearchMsg) SourceID() int64 {
return st.Base.SourceID
}
// GuaranteeTs returns the guarantee timestamp that querynode can perform this search request. This timestamp
// filled in client(e.g. pymilvus). The timestamp will be 0 if client never execute any insert, otherwise equals
// the timestamp from last insert response.
func (st *SearchMsg) GuaranteeTs() Timestamp {
return st.GetGuaranteeTimestamp()
}
// TravelTs returns the timestamp of a time travel search request
func (st *SearchMsg) TravelTs() Timestamp {
return st.GetTravelTimestamp()
}
// Marshal is used to serializing a message pack to byte array
func (st *SearchMsg) Marshal(input TsMsg) (MarshalType, error) {
searchTask := input.(*SearchMsg)
searchRequest := &searchTask.SearchRequest
@ -232,9 +268,10 @@ func (st *SearchMsg) Marshal(input TsMsg) (MarshalType, error) {
return mb, nil
}
// Unmarshal is used to deserializing a message pack from byte array
func (st *SearchMsg) Unmarshal(input MarshalType) (TsMsg, error) {
searchRequest := internalpb.SearchRequest{}
in, err := ConvertToByteArray(input)
in, err := convertToByteArray(input)
if err != nil {
return nil, err
}
@ -250,6 +287,8 @@ func (st *SearchMsg) Unmarshal(input MarshalType) (TsMsg, error) {
}
/////////////////////////////////////////SearchResult//////////////////////////////////////////
// SearchResultMsg is a message pack that contains the result of search request
type SearchResultMsg struct {
BaseMsg
internalpb.SearchResults
@ -258,18 +297,22 @@ type SearchResultMsg struct {
// interface implementation validation
var _ TsMsg = &SearchResultMsg{}
// ID returns the ID of this message pack
func (srt *SearchResultMsg) ID() UniqueID {
return srt.Base.MsgID
}
// Type returns the type of this message pack
func (srt *SearchResultMsg) Type() MsgType {
return srt.Base.MsgType
}
// SourceID indicated which component generated this message
func (srt *SearchResultMsg) SourceID() int64 {
return srt.Base.SourceID
}
// Marshal is used to serializing a message pack to byte array
func (srt *SearchResultMsg) Marshal(input TsMsg) (MarshalType, error) {
searchResultTask := input.(*SearchResultMsg)
searchResultRequest := &searchResultTask.SearchResults
@ -280,9 +323,10 @@ func (srt *SearchResultMsg) Marshal(input TsMsg) (MarshalType, error) {
return mb, nil
}
// Unmarshal is used to deserializing a message pack from byte array
func (srt *SearchResultMsg) Unmarshal(input MarshalType) (TsMsg, error) {
searchResultRequest := internalpb.SearchResults{}
in, err := ConvertToByteArray(input)
in, err := convertToByteArray(input)
if err != nil {
return nil, err
}
@ -298,6 +342,8 @@ func (srt *SearchResultMsg) Unmarshal(input MarshalType) (TsMsg, error) {
}
////////////////////////////////////////Retrieve/////////////////////////////////////////
// RetrieveMsg is a message pack that contains retrieve request
type RetrieveMsg struct {
BaseMsg
internalpb.RetrieveRequest
@ -306,26 +352,34 @@ type RetrieveMsg struct {
// interface implementation validation
var _ TsMsg = &RetrieveMsg{}
// ID returns the ID of this message pack
func (rm *RetrieveMsg) ID() UniqueID {
return rm.Base.MsgID
}
// Type returns the type of this message pack
func (rm *RetrieveMsg) Type() MsgType {
return rm.Base.MsgType
}
// SourceID indicated which component generated this message
func (rm *RetrieveMsg) SourceID() int64 {
return rm.Base.SourceID
}
// GuaranteeTs returns the guarantee timestamp that querynode can perform this query request. This timestamp
// filled in client(e.g. pymilvus). The timestamp will be 0 if client never execute any insert, otherwise equals
// the timestamp from last insert response.
func (rm *RetrieveMsg) GuaranteeTs() Timestamp {
return rm.GetGuaranteeTimestamp()
}
// TravelTs returns the timestamp of a time travel query request
func (rm *RetrieveMsg) TravelTs() Timestamp {
return rm.GetTravelTimestamp()
}
// Marshal is used to serializing a message pack to byte array
func (rm *RetrieveMsg) Marshal(input TsMsg) (MarshalType, error) {
retrieveTask := input.(*RetrieveMsg)
retrieveRequest := &retrieveTask.RetrieveRequest
@ -336,9 +390,10 @@ func (rm *RetrieveMsg) Marshal(input TsMsg) (MarshalType, error) {
return mb, nil
}
// Unmarshal is used to deserializing a message pack from byte array
func (rm *RetrieveMsg) Unmarshal(input MarshalType) (TsMsg, error) {
retrieveRequest := internalpb.RetrieveRequest{}
in, err := ConvertToByteArray(input)
in, err := convertToByteArray(input)
if err != nil {
return nil, err
}
@ -354,6 +409,8 @@ func (rm *RetrieveMsg) Unmarshal(input MarshalType) (TsMsg, error) {
}
//////////////////////////////////////RetrieveResult///////////////////////////////////////
// RetrieveResultMsg is a message pack that contains the result of query request
type RetrieveResultMsg struct {
BaseMsg
internalpb.RetrieveResults
@ -362,18 +419,22 @@ type RetrieveResultMsg struct {
// interface implementation validation
var _ TsMsg = &RetrieveResultMsg{}
// ID returns the ID of this message pack
func (rrm *RetrieveResultMsg) ID() UniqueID {
return rrm.Base.MsgID
}
// Type returns the type of this message pack
func (rrm *RetrieveResultMsg) Type() MsgType {
return rrm.Base.MsgType
}
// SourceID indicated which component generated this message
func (rrm *RetrieveResultMsg) SourceID() int64 {
return rrm.Base.SourceID
}
// Marshal is used to serializing a message pack to byte array
func (rrm *RetrieveResultMsg) Marshal(input TsMsg) (MarshalType, error) {
retrieveResultTask := input.(*RetrieveResultMsg)
retrieveResultRequest := &retrieveResultTask.RetrieveResults
@ -384,9 +445,10 @@ func (rrm *RetrieveResultMsg) Marshal(input TsMsg) (MarshalType, error) {
return mb, nil
}
// Unmarshal is used to deserializing a message pack from byte array
func (rrm *RetrieveResultMsg) Unmarshal(input MarshalType) (TsMsg, error) {
retrieveResultRequest := internalpb.RetrieveResults{}
in, err := ConvertToByteArray(input)
in, err := convertToByteArray(input)
if err != nil {
return nil, err
}
@ -402,6 +464,8 @@ func (rrm *RetrieveResultMsg) Unmarshal(input MarshalType) (TsMsg, error) {
}
/////////////////////////////////////////TimeTick//////////////////////////////////////////
// TimeTickMsg is a message pack that contains time tick only
type TimeTickMsg struct {
BaseMsg
internalpb.TimeTickMsg
@ -410,18 +474,22 @@ type TimeTickMsg struct {
// interface implementation validation
var _ TsMsg = &TimeTickMsg{}
// ID returns the ID of this message pack
func (tst *TimeTickMsg) ID() UniqueID {
return tst.Base.MsgID
}
// Type returns the type of this message pack
func (tst *TimeTickMsg) Type() MsgType {
return tst.Base.MsgType
}
// SourceID indicated which component generated this message
func (tst *TimeTickMsg) SourceID() int64 {
return tst.Base.SourceID
}
// Marshal is used to serializing a message pack to byte array
func (tst *TimeTickMsg) Marshal(input TsMsg) (MarshalType, error) {
timeTickTask := input.(*TimeTickMsg)
timeTick := &timeTickTask.TimeTickMsg
@ -432,9 +500,10 @@ func (tst *TimeTickMsg) Marshal(input TsMsg) (MarshalType, error) {
return mb, nil
}
// Unmarshal is used to deserializing a message pack from byte array
func (tst *TimeTickMsg) Unmarshal(input MarshalType) (TsMsg, error) {
timeTickMsg := internalpb.TimeTickMsg{}
in, err := ConvertToByteArray(input)
in, err := convertToByteArray(input)
if err != nil {
return nil, err
}
@ -450,6 +519,8 @@ func (tst *TimeTickMsg) Unmarshal(input MarshalType) (TsMsg, error) {
}
/////////////////////////////////////////QueryNodeStats//////////////////////////////////////////
// QueryNodeStatsMsg is a message pack that contains statistic from querynode
// GOOSE TODO: remove QueryNodeStats
type QueryNodeStatsMsg struct {
BaseMsg
@ -459,26 +530,32 @@ type QueryNodeStatsMsg struct {
// interface implementation validation
var _ TsMsg = &QueryNodeStatsMsg{}
// TraceCtx returns the context of opentracing
func (qs *QueryNodeStatsMsg) TraceCtx() context.Context {
return qs.BaseMsg.Ctx
}
// SetTraceCtx is used to set context for opentracing
func (qs *QueryNodeStatsMsg) SetTraceCtx(ctx context.Context) {
qs.BaseMsg.Ctx = ctx
}
// ID returns the ID of this message pack
func (qs *QueryNodeStatsMsg) ID() UniqueID {
return qs.Base.MsgID
}
// Type returns the type of this message pack
func (qs *QueryNodeStatsMsg) Type() MsgType {
return qs.Base.MsgType
}
// SourceID indicated which component generated this message
func (qs *QueryNodeStatsMsg) SourceID() int64 {
return qs.Base.SourceID
}
// Marshal is used to serializing a message pack to byte array
func (qs *QueryNodeStatsMsg) Marshal(input TsMsg) (MarshalType, error) {
queryNodeSegStatsTask := input.(*QueryNodeStatsMsg)
queryNodeSegStats := &queryNodeSegStatsTask.QueryNodeStats
@ -489,9 +566,10 @@ func (qs *QueryNodeStatsMsg) Marshal(input TsMsg) (MarshalType, error) {
return mb, nil
}
// Unmarshal is used to deserializing a message pack from byte array
func (qs *QueryNodeStatsMsg) Unmarshal(input MarshalType) (TsMsg, error) {
queryNodeSegStats := internalpb.QueryNodeStats{}
in, err := ConvertToByteArray(input)
in, err := convertToByteArray(input)
if err != nil {
return nil, err
}
@ -505,6 +583,8 @@ func (qs *QueryNodeStatsMsg) Unmarshal(input MarshalType) (TsMsg, error) {
}
/////////////////////////////////////////SegmentStatisticsMsg//////////////////////////////////////////
// SegmentStatisticsMsg is a message pack that contains segment statistic
type SegmentStatisticsMsg struct {
BaseMsg
internalpb.SegmentStatistics
@ -513,26 +593,32 @@ type SegmentStatisticsMsg struct {
// interface implementation validation
var _ TsMsg = &SegmentStatisticsMsg{}
// TraceCtx returns the context of opentracing
func (ss *SegmentStatisticsMsg) TraceCtx() context.Context {
return ss.BaseMsg.Ctx
}
// SetTraceCtx is used to set context for opentracing
func (ss *SegmentStatisticsMsg) SetTraceCtx(ctx context.Context) {
ss.BaseMsg.Ctx = ctx
}
// ID returns the ID of this message pack
func (ss *SegmentStatisticsMsg) ID() UniqueID {
return ss.Base.MsgID
}
// Type returns the type of this message pack
func (ss *SegmentStatisticsMsg) Type() MsgType {
return ss.Base.MsgType
}
// SourceID indicated which component generated this message
func (ss *SegmentStatisticsMsg) SourceID() int64 {
return ss.Base.SourceID
}
// Marshal is used to serializing a message pack to byte array
func (ss *SegmentStatisticsMsg) Marshal(input TsMsg) (MarshalType, error) {
segStatsTask := input.(*SegmentStatisticsMsg)
segStats := &segStatsTask.SegmentStatistics
@ -543,9 +629,10 @@ func (ss *SegmentStatisticsMsg) Marshal(input TsMsg) (MarshalType, error) {
return mb, nil
}
// Unmarshal is used to deserializing a message pack from byte array
func (ss *SegmentStatisticsMsg) Unmarshal(input MarshalType) (TsMsg, error) {
segStats := internalpb.SegmentStatistics{}
in, err := ConvertToByteArray(input)
in, err := convertToByteArray(input)
if err != nil {
return nil, err
}
@ -559,6 +646,8 @@ func (ss *SegmentStatisticsMsg) Unmarshal(input MarshalType) (TsMsg, error) {
}
/////////////////////////////////////////CreateCollection//////////////////////////////////////////
// CreateCollectionMsg is a message pack that contains create collection request
type CreateCollectionMsg struct {
BaseMsg
internalpb.CreateCollectionRequest
@ -567,18 +656,22 @@ type CreateCollectionMsg struct {
// interface implementation validation
var _ TsMsg = &CreateCollectionMsg{}
// ID returns the ID of this message pack
func (cc *CreateCollectionMsg) ID() UniqueID {
return cc.Base.MsgID
}
// Type returns the type of this message pack
func (cc *CreateCollectionMsg) Type() MsgType {
return cc.Base.MsgType
}
// SourceID indicated which component generated this message
func (cc *CreateCollectionMsg) SourceID() int64 {
return cc.Base.SourceID
}
// Marshal is used to serializing a message pack to byte array
func (cc *CreateCollectionMsg) Marshal(input TsMsg) (MarshalType, error) {
createCollectionMsg := input.(*CreateCollectionMsg)
createCollectionRequest := &createCollectionMsg.CreateCollectionRequest
@ -589,9 +682,10 @@ func (cc *CreateCollectionMsg) Marshal(input TsMsg) (MarshalType, error) {
return mb, nil
}
// Unmarshal is used to deserializing a message pack from byte array
func (cc *CreateCollectionMsg) Unmarshal(input MarshalType) (TsMsg, error) {
createCollectionRequest := internalpb.CreateCollectionRequest{}
in, err := ConvertToByteArray(input)
in, err := convertToByteArray(input)
if err != nil {
return nil, err
}
@ -607,6 +701,8 @@ func (cc *CreateCollectionMsg) Unmarshal(input MarshalType) (TsMsg, error) {
}
/////////////////////////////////////////DropCollection//////////////////////////////////////////
// DropCollectionMsg is a message pack that contains drop collection request
type DropCollectionMsg struct {
BaseMsg
internalpb.DropCollectionRequest
@ -615,18 +711,22 @@ type DropCollectionMsg struct {
// interface implementation validation
var _ TsMsg = &DropCollectionMsg{}
// ID returns the ID of this message pack
func (dc *DropCollectionMsg) ID() UniqueID {
return dc.Base.MsgID
}
// Type returns the type of this message pack
func (dc *DropCollectionMsg) Type() MsgType {
return dc.Base.MsgType
}
// SourceID indicated which component generated this message
func (dc *DropCollectionMsg) SourceID() int64 {
return dc.Base.SourceID
}
// Marshal is used to serializing a message pack to byte array
func (dc *DropCollectionMsg) Marshal(input TsMsg) (MarshalType, error) {
dropCollectionMsg := input.(*DropCollectionMsg)
dropCollectionRequest := &dropCollectionMsg.DropCollectionRequest
@ -637,9 +737,10 @@ func (dc *DropCollectionMsg) Marshal(input TsMsg) (MarshalType, error) {
return mb, nil
}
// Unmarshal is used to deserializing a message pack from byte array
func (dc *DropCollectionMsg) Unmarshal(input MarshalType) (TsMsg, error) {
dropCollectionRequest := internalpb.DropCollectionRequest{}
in, err := ConvertToByteArray(input)
in, err := convertToByteArray(input)
if err != nil {
return nil, err
}
@ -655,6 +756,8 @@ func (dc *DropCollectionMsg) Unmarshal(input MarshalType) (TsMsg, error) {
}
/////////////////////////////////////////CreatePartition//////////////////////////////////////////
// CreatePartitionMsg is a message pack that contains create partition request
type CreatePartitionMsg struct {
BaseMsg
internalpb.CreatePartitionRequest
@ -663,18 +766,22 @@ type CreatePartitionMsg struct {
// interface implementation validation
var _ TsMsg = &CreatePartitionMsg{}
// ID returns the ID of this message pack
func (cp *CreatePartitionMsg) ID() UniqueID {
return cp.Base.MsgID
}
// Type returns the type of this message pack
func (cp *CreatePartitionMsg) Type() MsgType {
return cp.Base.MsgType
}
// SourceID indicated which component generated this message
func (cp *CreatePartitionMsg) SourceID() int64 {
return cp.Base.SourceID
}
// Marshal is used to serializing a message pack to byte array
func (cp *CreatePartitionMsg) Marshal(input TsMsg) (MarshalType, error) {
createPartitionMsg := input.(*CreatePartitionMsg)
createPartitionRequest := &createPartitionMsg.CreatePartitionRequest
@ -685,9 +792,10 @@ func (cp *CreatePartitionMsg) Marshal(input TsMsg) (MarshalType, error) {
return mb, nil
}
// Unmarshal is used to deserializing a message pack from byte array
func (cp *CreatePartitionMsg) Unmarshal(input MarshalType) (TsMsg, error) {
createPartitionRequest := internalpb.CreatePartitionRequest{}
in, err := ConvertToByteArray(input)
in, err := convertToByteArray(input)
if err != nil {
return nil, err
}
@ -703,6 +811,8 @@ func (cp *CreatePartitionMsg) Unmarshal(input MarshalType) (TsMsg, error) {
}
/////////////////////////////////////////DropPartition//////////////////////////////////////////
// DropPartitionMsg is a message pack that contains drop partition request
type DropPartitionMsg struct {
BaseMsg
internalpb.DropPartitionRequest
@ -711,18 +821,22 @@ type DropPartitionMsg struct {
// interface implementation validation
var _ TsMsg = &DropPartitionMsg{}
// ID returns the ID of this message pack
func (dp *DropPartitionMsg) ID() UniqueID {
return dp.Base.MsgID
}
// Type returns the type of this message pack
func (dp *DropPartitionMsg) Type() MsgType {
return dp.Base.MsgType
}
// SourceID indicated which component generated this message
func (dp *DropPartitionMsg) SourceID() int64 {
return dp.Base.SourceID
}
// Marshal is used to serializing a message pack to byte array
func (dp *DropPartitionMsg) Marshal(input TsMsg) (MarshalType, error) {
dropPartitionMsg := input.(*DropPartitionMsg)
dropPartitionRequest := &dropPartitionMsg.DropPartitionRequest
@ -733,9 +847,10 @@ func (dp *DropPartitionMsg) Marshal(input TsMsg) (MarshalType, error) {
return mb, nil
}
// Unmarshal is used to deserializing a message pack from byte array
func (dp *DropPartitionMsg) Unmarshal(input MarshalType) (TsMsg, error) {
dropPartitionRequest := internalpb.DropPartitionRequest{}
in, err := ConvertToByteArray(input)
in, err := convertToByteArray(input)
if err != nil {
return nil, err
}
@ -758,26 +873,32 @@ type LoadIndexMsg struct {
internalpb.LoadIndex
}
// TraceCtx returns the context of opentracing
func (lim *LoadIndexMsg) TraceCtx() context.Context {
return lim.BaseMsg.Ctx
}
// SetTraceCtx is used to set context for opentracing
func (lim *LoadIndexMsg) SetTraceCtx(ctx context.Context) {
lim.BaseMsg.Ctx = ctx
}
// ID returns the ID of this message pack
func (lim *LoadIndexMsg) ID() UniqueID {
return lim.Base.MsgID
}
// Type returns the type of this message pack
func (lim *LoadIndexMsg) Type() MsgType {
return lim.Base.MsgType
}
// SourceID indicated which component generated this message
func (lim *LoadIndexMsg) SourceID() int64 {
return lim.Base.SourceID
}
// Marshal is used to serializing a message pack to byte array
func (lim *LoadIndexMsg) Marshal(input TsMsg) (MarshalType, error) {
loadIndexMsg := input.(*LoadIndexMsg)
loadIndexRequest := &loadIndexMsg.LoadIndex
@ -788,9 +909,10 @@ func (lim *LoadIndexMsg) Marshal(input TsMsg) (MarshalType, error) {
return mb, nil
}
// Unmarshal is used to deserializing a message pack from byte array
func (lim *LoadIndexMsg) Unmarshal(input MarshalType) (TsMsg, error) {
loadIndexRequest := internalpb.LoadIndex{}
in, err := ConvertToByteArray(input)
in, err := convertToByteArray(input)
if err != nil {
return nil, err
}
@ -805,6 +927,8 @@ func (lim *LoadIndexMsg) Unmarshal(input MarshalType) (TsMsg, error) {
*/
/////////////////////////////////////////LoadBalanceSegments//////////////////////////////////////////
// LoadBalanceSegmentsMsg is a message pack that contains load balance segments request
type LoadBalanceSegmentsMsg struct {
BaseMsg
internalpb.LoadBalanceSegmentsRequest
@ -813,18 +937,22 @@ type LoadBalanceSegmentsMsg struct {
// interface implementation validation
var _ TsMsg = &LoadBalanceSegmentsMsg{}
// ID returns the ID of this message pack
func (l *LoadBalanceSegmentsMsg) ID() UniqueID {
return l.Base.MsgID
}
// Type returns the type of this message pack
func (l *LoadBalanceSegmentsMsg) Type() MsgType {
return l.Base.MsgType
}
// SourceID indicated which component generated this message
func (l *LoadBalanceSegmentsMsg) SourceID() int64 {
return l.Base.SourceID
}
// Marshal is used to serializing a message pack to byte array
func (l *LoadBalanceSegmentsMsg) Marshal(input TsMsg) (MarshalType, error) {
load := input.(*LoadBalanceSegmentsMsg)
loadReq := &load.LoadBalanceSegmentsRequest
@ -835,9 +963,10 @@ func (l *LoadBalanceSegmentsMsg) Marshal(input TsMsg) (MarshalType, error) {
return mb, nil
}
// Unmarshal is used to deserializing a message pack from byte array
func (l *LoadBalanceSegmentsMsg) Unmarshal(input MarshalType) (TsMsg, error) {
loadReq := internalpb.LoadBalanceSegmentsRequest{}
in, err := ConvertToByteArray(input)
in, err := convertToByteArray(input)
if err != nil {
return nil, err
}
@ -852,6 +981,7 @@ func (l *LoadBalanceSegmentsMsg) Unmarshal(input MarshalType) (TsMsg, error) {
return loadMsg, nil
}
// DataNodeTtMsg is a message pack that contains datanode time tick
type DataNodeTtMsg struct {
BaseMsg
datapb.DataNodeTtMsg
@ -860,18 +990,22 @@ type DataNodeTtMsg struct {
// interface implementation validation
var _ TsMsg = &DataNodeTtMsg{}
// ID returns the ID of this message pack
func (m *DataNodeTtMsg) ID() UniqueID {
return m.Base.MsgID
}
// Type returns the type of this message pack
func (m *DataNodeTtMsg) Type() MsgType {
return m.Base.MsgType
}
// SourceID indicated which component generated this message
func (m *DataNodeTtMsg) SourceID() int64 {
return m.Base.SourceID
}
// Marshal is used to serializing a message pack to byte array
func (m *DataNodeTtMsg) Marshal(input TsMsg) (MarshalType, error) {
msg := input.(*DataNodeTtMsg)
t, err := proto.Marshal(&msg.DataNodeTtMsg)
@ -881,9 +1015,10 @@ func (m *DataNodeTtMsg) Marshal(input TsMsg) (MarshalType, error) {
return t, nil
}
// Unmarshal is used to deserializing a message pack from byte array
func (m *DataNodeTtMsg) Unmarshal(input MarshalType) (TsMsg, error) {
msg := datapb.DataNodeTtMsg{}
in, err := ConvertToByteArray(input)
in, err := convertToByteArray(input)
if err != nil {
return nil, err
}

View File

@ -49,17 +49,17 @@ func TestBaseMsg(t *testing.T) {
assert.Equal(t, position, baseMsg.Position())
}
func Test_ConvertToByteArray(t *testing.T) {
func Test_convertToByteArray(t *testing.T) {
{
bytes := []byte{1, 2, 3}
byteArray, err := ConvertToByteArray(bytes)
byteArray, err := convertToByteArray(bytes)
assert.Equal(t, bytes, byteArray)
assert.Nil(t, err)
}
{
bytes := 4
byteArray, err := ConvertToByteArray(bytes)
byteArray, err := convertToByteArray(bytes)
assert.Equal(t, ([]byte)(nil), byteArray)
assert.NotNil(t, err)
}

View File

@ -70,7 +70,7 @@ func Test_ProtoUnmarshalDispatcher(t *testing.T) {
headerMsg := commonpb.MsgHeader{}
payload, err := v.Marshal(v)
assert.Nil(t, err)
p, err := ConvertToByteArray(payload)
p, err := convertToByteArray(payload)
assert.Nil(t, err)
err = proto.Unmarshal(p, &headerMsg)
assert.Nil(t, err)