Delete wrong useage of *TsMsg

Signed-off-by: xige-16 <xi.ge@zilliz.com>
pull/4973/head^2
xige-16 2020-11-17 14:10:07 +08:00 committed by yefu.chen
parent a4270500f0
commit 4e8efe85df
23 changed files with 137 additions and 162 deletions

View File

@ -37,15 +37,10 @@ jobs:
- name: Get version from system time after release step
id: extracter
run: echo "::set-output name=version::$(date +%Y%m%d-%H%M%S)"
- name: Docker Pull
shell: bash
run: |
DATE_VERSION=latest docker-compose pull --ignore-pull-failures ubuntu
- name: Docker Build
shell: bash
run: |
DATE_VERSION=${{ steps.extracter.outputs.version }} docker-compose build ubuntu
DATE_VERSION=latest docker-compose build ubuntu
- name: Docker Push
if: success() && github.event_name == 'push' && github.repository == 'zilliztech/milvus-distributed'
continue-on-error: true
@ -54,4 +49,3 @@ jobs:
docker login -u ${{ secrets.DOCKERHUB_USER }} \
-p ${{ secrets.DOCKERHUB_TOKEN }}
DATE_VERSION=${{ steps.extracter.outputs.version }} docker-compose push ubuntu
DATE_VERSION=latest docker-compose push ubuntu

View File

@ -66,7 +66,7 @@ func receiveMsg(stream *ms.MsgStream) []uint64 {
if len(result.Msgs) > 0 {
msgs := result.Msgs
for _, v := range msgs {
timetickmsg := (*v).(*ms.TimeTickMsg)
timetickmsg := v.(*ms.TimeTickMsg)
results = append(results, timetickmsg.TimeTickMsg.Timestamp)
receiveCount++
if receiveCount == 10 {

View File

@ -71,9 +71,7 @@ func (syncMsgProducer *timeSyncMsgProducer) broadcastMsg(barrier TimeTickBarrier
BaseMsg: baseMsg,
TimeTickMsg: timeTickResult,
}
var tsMsg ms.TsMsg
tsMsg = timeTickMsg
msgPack.Msgs = append(msgPack.Msgs, &tsMsg)
msgPack.Msgs = append(msgPack.Msgs, timeTickMsg)
err = stream.Broadcast(&msgPack)
if err != nil {
return err

View File

@ -71,7 +71,7 @@ func (ttBarrier *softTimeTickBarrier) Start() error {
case ttmsgs := <-ttBarrier.ttStream.Chan():
if len(ttmsgs.Msgs) > 0 {
for _, timetickmsg := range ttmsgs.Msgs {
ttmsg := (*timetickmsg).(*ms.TimeTickMsg)
ttmsg := timetickmsg.(*ms.TimeTickMsg)
oldT, ok := ttBarrier.peer2LastTt[ttmsg.PeerID]
log.Printf("[softTimeTickBarrier] peer(%d)=%d\n", ttmsg.PeerID, ttmsg.Timestamp)
@ -188,7 +188,7 @@ func (ttBarrier *hardTimeTickBarrier) Start() error {
// Suppose ttmsg.Timestamp from stream is always larger than the previous one,
// that `ttmsg.Timestamp > oldT`
ttmsg := (*timetickmsg).(*ms.TimeTickMsg)
ttmsg := timetickmsg.(*ms.TimeTickMsg)
log.Printf("[hardTimeTickBarrier] peer(%d)=%d\n", ttmsg.PeerID, ttmsg.Timestamp)
oldT, ok := ttBarrier.peer2Tt[ttmsg.PeerID]

View File

@ -14,8 +14,7 @@ import (
internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
)
func getTtMsg(msgType internalPb.MsgType, peerID UniqueID, timeStamp uint64) *ms.TsMsg {
var tsMsg ms.TsMsg
func getTtMsg(msgType internalPb.MsgType, peerID UniqueID, timeStamp uint64) ms.TsMsg {
baseMsg := ms.BaseMsg{
HashValues: []int32{int32(peerID)},
}
@ -28,8 +27,8 @@ func getTtMsg(msgType internalPb.MsgType, peerID UniqueID, timeStamp uint64) *ms
BaseMsg: baseMsg,
TimeTickMsg: timeTickResult,
}
tsMsg = timeTickMsg
return &tsMsg
return timeTickMsg
}
func initPulsarStream(pulsarAddress string,

View File

@ -21,10 +21,10 @@ type IntPrimaryKey = typeutil.IntPrimaryKey
type MsgPack struct {
BeginTs Timestamp
EndTs Timestamp
Msgs []*TsMsg
Msgs []TsMsg
}
type RepackFunc func(msgs []*TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error)
type RepackFunc func(msgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error)
type MsgStream interface {
Start()
@ -138,7 +138,7 @@ func (ms *PulsarMsgStream) Produce(msgPack *MsgPack) error {
}
reBucketValues := make([][]int32, len(tsMsgs))
for channelID, tsMsg := range tsMsgs {
hashValues := (*tsMsg).HashKeys()
hashValues := tsMsg.HashKeys()
bucketValues := make([]int32, len(hashValues))
for index, hashValue := range hashValues {
bucketValues[index] = hashValue % int32(len(ms.producers))
@ -151,7 +151,7 @@ func (ms *PulsarMsgStream) Produce(msgPack *MsgPack) error {
if ms.repackFunc != nil {
result, err = ms.repackFunc(tsMsgs, reBucketValues)
} else {
msgType := (*tsMsgs[0]).Type()
msgType := (tsMsgs[0]).Type()
switch msgType {
case internalPb.MsgType_kInsert:
result, err = insertRepackFunc(tsMsgs, reBucketValues)
@ -166,7 +166,7 @@ func (ms *PulsarMsgStream) Produce(msgPack *MsgPack) error {
}
for k, v := range result {
for i := 0; i < len(v.Msgs); i++ {
mb, err := (*v.Msgs[i]).Marshal(v.Msgs[i])
mb, err := v.Msgs[i].Marshal(v.Msgs[i])
if err != nil {
return err
}
@ -184,7 +184,7 @@ func (ms *PulsarMsgStream) Produce(msgPack *MsgPack) error {
func (ms *PulsarMsgStream) Broadcast(msgPack *MsgPack) error {
producerLen := len(ms.producers)
for _, v := range msgPack.Msgs {
mb, err := (*v).Marshal(v)
mb, err := v.Marshal(v)
if err != nil {
return err
}
@ -223,7 +223,7 @@ func (ms *PulsarMsgStream) bufMsgPackToChannel() {
case <-ms.ctx.Done():
return
default:
tsMsgList := make([]*TsMsg, 0)
tsMsgList := make([]TsMsg, 0)
for i := 0; i < len(ms.consumers); i++ {
consumerChan := (*ms.consumers[i]).Chan()
chanLen := len(consumerChan)
@ -263,8 +263,8 @@ func (ms *PulsarMsgStream) Chan() <-chan *MsgPack {
type PulsarTtMsgStream struct {
PulsarMsgStream
inputBuf []*TsMsg
unsolvedBuf []*TsMsg
inputBuf []TsMsg
unsolvedBuf []TsMsg
lastTimeStamp Timestamp
}
@ -290,8 +290,8 @@ func (ms *PulsarTtMsgStream) Start() {
func (ms *PulsarTtMsgStream) bufMsgPackToChannel() {
defer ms.wait.Done()
ms.unsolvedBuf = make([]*TsMsg, 0)
ms.inputBuf = make([]*TsMsg, 0)
ms.unsolvedBuf = make([]TsMsg, 0)
ms.inputBuf = make([]TsMsg, 0)
for {
select {
case <-ms.ctx.Done():
@ -310,11 +310,11 @@ func (ms *PulsarTtMsgStream) bufMsgPackToChannel() {
log.Printf("All timeTick's timestamps are inconsistent")
}
timeTickBuf := make([]*TsMsg, 0)
timeTickBuf := make([]TsMsg, 0)
ms.inputBuf = append(ms.inputBuf, ms.unsolvedBuf...)
ms.unsolvedBuf = ms.unsolvedBuf[:0]
for _, v := range ms.inputBuf {
if (*v).EndTs() <= timeStamp {
if v.EndTs() <= timeStamp {
timeTickBuf = append(timeTickBuf, v)
} else {
ms.unsolvedBuf = append(ms.unsolvedBuf, v)
@ -362,7 +362,7 @@ func (ms *PulsarTtMsgStream) findTimeTick(channelIndex int,
log.Printf("Failed to unmarshal, error = %v", err)
}
if headerMsg.MsgType == internalPb.MsgType_kTimeTick {
eofMsgMap[channelIndex] = (*tsMsg).(*TimeTickMsg).Timestamp
eofMsgMap[channelIndex] = tsMsg.(*TimeTickMsg).Timestamp
return
}
mu.Lock()
@ -385,13 +385,13 @@ func checkTimeTickMsg(msg map[int]Timestamp) (Timestamp, bool) {
return 0, false
}
func insertRepackFunc(tsMsgs []*TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error) {
func insertRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error) {
result := make(map[int32]*MsgPack)
for i, request := range tsMsgs {
if (*request).Type() != internalPb.MsgType_kInsert {
if request.Type() != internalPb.MsgType_kInsert {
return nil, errors.New(string("msg's must be Insert"))
}
insertRequest := (*request).(*InsertMsg)
insertRequest := request.(*InsertMsg)
keys := hashKeys[i]
timestampLen := len(insertRequest.Timestamps)
@ -422,23 +422,22 @@ func insertRepackFunc(tsMsgs []*TsMsg, hashKeys [][]int32) (map[int32]*MsgPack,
RowData: []*commonPb.Blob{insertRequest.RowData[index]},
}
var msg TsMsg = &InsertMsg{
insertMsg := &InsertMsg{
InsertRequest: sliceRequest,
}
result[key].Msgs = append(result[key].Msgs, &msg)
result[key].Msgs = append(result[key].Msgs, insertMsg)
}
}
return result, nil
}
func deleteRepackFunc(tsMsgs []*TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error) {
func deleteRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error) {
result := make(map[int32]*MsgPack)
for i, request := range tsMsgs {
if (*request).Type() != internalPb.MsgType_kDelete {
if request.Type() != internalPb.MsgType_kDelete {
return nil, errors.New(string("msg's must be Delete"))
}
deleteRequest := (*request).(*DeleteMsg)
deleteRequest := request.(*DeleteMsg)
keys := hashKeys[i]
timestampLen := len(deleteRequest.Timestamps)
@ -466,17 +465,16 @@ func deleteRepackFunc(tsMsgs []*TsMsg, hashKeys [][]int32) (map[int32]*MsgPack,
PrimaryKeys: []int64{deleteRequest.PrimaryKeys[index]},
}
var msg TsMsg = &DeleteMsg{
deleteMsg := &DeleteMsg{
DeleteRequest: sliceRequest,
}
result[key].Msgs = append(result[key].Msgs, &msg)
result[key].Msgs = append(result[key].Msgs, deleteMsg)
}
}
return result, nil
}
func defaultRepackFunc(tsMsgs []*TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error) {
func defaultRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error) {
result := make(map[int32]*MsgPack)
for i, request := range tsMsgs {
keys := hashKeys[i]
@ -489,7 +487,6 @@ func defaultRepackFunc(tsMsgs []*TsMsg, hashKeys [][]int32) (map[int32]*MsgPack,
msgPack := MsgPack{}
result[key] = &msgPack
}
result[key].Msgs = append(result[key].Msgs, request)
}
return result, nil

View File

@ -10,7 +10,7 @@ import (
internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
)
func repackFunc(msgs []*TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error) {
func repackFunc(msgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error) {
result := make(map[int32]*MsgPack)
for i, request := range msgs {
keys := hashKeys[i]
@ -26,8 +26,7 @@ func repackFunc(msgs []*TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error) {
return result, nil
}
func getTsMsg(msgType MsgType, reqID UniqueID, hashValue int32) *TsMsg {
var tsMsg TsMsg
func getTsMsg(msgType MsgType, reqID UniqueID, hashValue int32) TsMsg {
baseMsg := BaseMsg{
BeginTimestamp: 0,
EndTimestamp: 0,
@ -51,7 +50,7 @@ func getTsMsg(msgType MsgType, reqID UniqueID, hashValue int32) *TsMsg {
BaseMsg: baseMsg,
InsertRequest: insertRequest,
}
tsMsg = insertMsg
return insertMsg
case internalPb.MsgType_kDelete:
deleteRequest := internalPb.DeleteRequest{
MsgType: internalPb.MsgType_kDelete,
@ -66,7 +65,7 @@ func getTsMsg(msgType MsgType, reqID UniqueID, hashValue int32) *TsMsg {
BaseMsg: baseMsg,
DeleteRequest: deleteRequest,
}
tsMsg = deleteMsg
return deleteMsg
case internalPb.MsgType_kSearch:
searchRequest := internalPb.SearchRequest{
MsgType: internalPb.MsgType_kSearch,
@ -79,7 +78,7 @@ func getTsMsg(msgType MsgType, reqID UniqueID, hashValue int32) *TsMsg {
BaseMsg: baseMsg,
SearchRequest: searchRequest,
}
tsMsg = searchMsg
return searchMsg
case internalPb.MsgType_kSearchResult:
searchResult := internalPb.SearchResult{
MsgType: internalPb.MsgType_kSearchResult,
@ -94,7 +93,7 @@ func getTsMsg(msgType MsgType, reqID UniqueID, hashValue int32) *TsMsg {
BaseMsg: baseMsg,
SearchResult: searchResult,
}
tsMsg = searchResultMsg
return searchResultMsg
case internalPb.MsgType_kTimeTick:
timeTickResult := internalPb.TimeTickMsg{
MsgType: internalPb.MsgType_kTimeTick,
@ -105,7 +104,7 @@ func getTsMsg(msgType MsgType, reqID UniqueID, hashValue int32) *TsMsg {
BaseMsg: baseMsg,
TimeTickMsg: timeTickResult,
}
tsMsg = timeTickMsg
return timeTickMsg
case internalPb.MsgType_kQueryNodeSegStats:
queryNodeSegStats := internalPb.QueryNodeSegStats{
MsgType: internalPb.MsgType_kQueryNodeSegStats,
@ -115,13 +114,12 @@ func getTsMsg(msgType MsgType, reqID UniqueID, hashValue int32) *TsMsg {
BaseMsg: baseMsg,
QueryNodeSegStats: queryNodeSegStats,
}
tsMsg = queryNodeSegStatsMsg
return queryNodeSegStatsMsg
}
return &tsMsg
return nil
}
func getTimeTickMsg(reqID UniqueID, hashValue int32, time uint64) *TsMsg {
var tsMsg TsMsg
func getTimeTickMsg(reqID UniqueID, hashValue int32, time uint64) TsMsg {
baseMsg := BaseMsg{
BeginTimestamp: 0,
EndTimestamp: 0,
@ -136,8 +134,7 @@ func getTimeTickMsg(reqID UniqueID, hashValue int32, time uint64) *TsMsg {
BaseMsg: baseMsg,
TimeTickMsg: timeTickResult,
}
tsMsg = timeTickMsg
return &tsMsg
return timeTickMsg
}
func initPulsarStream(pulsarAddress string,
@ -202,7 +199,7 @@ func receiveMsg(outputStream *MsgStream, msgCount int) {
msgs := result.Msgs
for _, v := range msgs {
receiveCount++
fmt.Println("msg type: ", (*v).Type(), ", msg value: ", *v)
fmt.Println("msg type: ", v.Type(), ", msg value: ", v)
}
}
if receiveCount >= msgCount {
@ -381,9 +378,9 @@ func TestStream_PulsarMsgStream_InsertRepackFunc(t *testing.T) {
BaseMsg: baseMsg,
InsertRequest: insertRequest,
}
var tsMsg TsMsg = insertMsg
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, &tsMsg)
msgPack.Msgs = append(msgPack.Msgs, insertMsg)
inputStream := NewPulsarMsgStream(context.Background(), 100)
inputStream.SetPulsarCient(pulsarAddress)
@ -431,9 +428,9 @@ func TestStream_PulsarMsgStream_DeleteRepackFunc(t *testing.T) {
BaseMsg: baseMsg,
DeleteRequest: deleteRequest,
}
var tsMsg TsMsg = deleteMsg
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, &tsMsg)
msgPack.Msgs = append(msgPack.Msgs, deleteMsg)
inputStream := NewPulsarMsgStream(context.Background(), 100)
inputStream.SetPulsarCient(pulsarAddress)

View File

@ -12,8 +12,8 @@ type TsMsg interface {
EndTs() Timestamp
Type() MsgType
HashKeys() []int32
Marshal(*TsMsg) ([]byte, error)
Unmarshal([]byte) (*TsMsg, error)
Marshal(TsMsg) ([]byte, error)
Unmarshal([]byte) (TsMsg, error)
}
type BaseMsg struct {
@ -44,8 +44,8 @@ func (it *InsertMsg) Type() MsgType {
return it.MsgType
}
func (it *InsertMsg) Marshal(input *TsMsg) ([]byte, error) {
insertMsg := (*input).(*InsertMsg)
func (it *InsertMsg) Marshal(input TsMsg) ([]byte, error) {
insertMsg := input.(*InsertMsg)
insertRequest := &insertMsg.InsertRequest
mb, err := proto.Marshal(insertRequest)
if err != nil {
@ -54,7 +54,7 @@ func (it *InsertMsg) Marshal(input *TsMsg) ([]byte, error) {
return mb, nil
}
func (it *InsertMsg) Unmarshal(input []byte) (*TsMsg, error) {
func (it *InsertMsg) Unmarshal(input []byte) (TsMsg, error) {
insertRequest := internalPb.InsertRequest{}
err := proto.Unmarshal(input, &insertRequest)
if err != nil {
@ -75,8 +75,7 @@ func (it *InsertMsg) Unmarshal(input []byte) (*TsMsg, error) {
}
}
var tsMsg TsMsg = insertMsg
return &tsMsg, nil
return insertMsg, nil
}
/////////////////////////////////////////Delete//////////////////////////////////////////
@ -89,8 +88,8 @@ func (dt *DeleteMsg) Type() MsgType {
return dt.MsgType
}
func (dt *DeleteMsg) Marshal(input *TsMsg) ([]byte, error) {
deleteTask := (*input).(*DeleteMsg)
func (dt *DeleteMsg) Marshal(input TsMsg) ([]byte, error) {
deleteTask := input.(*DeleteMsg)
deleteRequest := &deleteTask.DeleteRequest
mb, err := proto.Marshal(deleteRequest)
if err != nil {
@ -99,7 +98,7 @@ func (dt *DeleteMsg) Marshal(input *TsMsg) ([]byte, error) {
return mb, nil
}
func (dt *DeleteMsg) Unmarshal(input []byte) (*TsMsg, error) {
func (dt *DeleteMsg) Unmarshal(input []byte) (TsMsg, error) {
deleteRequest := internalPb.DeleteRequest{}
err := proto.Unmarshal(input, &deleteRequest)
if err != nil {
@ -120,8 +119,7 @@ func (dt *DeleteMsg) Unmarshal(input []byte) (*TsMsg, error) {
}
}
var tsMsg TsMsg = deleteMsg
return &tsMsg, nil
return deleteMsg, nil
}
/////////////////////////////////////////Search//////////////////////////////////////////
@ -134,8 +132,8 @@ func (st *SearchMsg) Type() MsgType {
return st.MsgType
}
func (st *SearchMsg) Marshal(input *TsMsg) ([]byte, error) {
searchTask := (*input).(*SearchMsg)
func (st *SearchMsg) Marshal(input TsMsg) ([]byte, error) {
searchTask := input.(*SearchMsg)
searchRequest := &searchTask.SearchRequest
mb, err := proto.Marshal(searchRequest)
if err != nil {
@ -144,7 +142,7 @@ func (st *SearchMsg) Marshal(input *TsMsg) ([]byte, error) {
return mb, nil
}
func (st *SearchMsg) Unmarshal(input []byte) (*TsMsg, error) {
func (st *SearchMsg) Unmarshal(input []byte) (TsMsg, error) {
searchRequest := internalPb.SearchRequest{}
err := proto.Unmarshal(input, &searchRequest)
if err != nil {
@ -154,8 +152,7 @@ func (st *SearchMsg) Unmarshal(input []byte) (*TsMsg, error) {
searchMsg.BeginTimestamp = searchMsg.Timestamp
searchMsg.EndTimestamp = searchMsg.Timestamp
var tsMsg TsMsg = searchMsg
return &tsMsg, nil
return searchMsg, nil
}
/////////////////////////////////////////SearchResult//////////////////////////////////////////
@ -168,8 +165,8 @@ func (srt *SearchResultMsg) Type() MsgType {
return srt.MsgType
}
func (srt *SearchResultMsg) Marshal(input *TsMsg) ([]byte, error) {
searchResultTask := (*input).(*SearchResultMsg)
func (srt *SearchResultMsg) Marshal(input TsMsg) ([]byte, error) {
searchResultTask := input.(*SearchResultMsg)
searchResultRequest := &searchResultTask.SearchResult
mb, err := proto.Marshal(searchResultRequest)
if err != nil {
@ -178,7 +175,7 @@ func (srt *SearchResultMsg) Marshal(input *TsMsg) ([]byte, error) {
return mb, nil
}
func (srt *SearchResultMsg) Unmarshal(input []byte) (*TsMsg, error) {
func (srt *SearchResultMsg) Unmarshal(input []byte) (TsMsg, error) {
searchResultRequest := internalPb.SearchResult{}
err := proto.Unmarshal(input, &searchResultRequest)
if err != nil {
@ -188,8 +185,7 @@ func (srt *SearchResultMsg) Unmarshal(input []byte) (*TsMsg, error) {
searchResultMsg.BeginTimestamp = searchResultMsg.Timestamp
searchResultMsg.EndTimestamp = searchResultMsg.Timestamp
var tsMsg TsMsg = searchResultMsg
return &tsMsg, nil
return searchResultMsg, nil
}
/////////////////////////////////////////TimeTick//////////////////////////////////////////
@ -202,8 +198,8 @@ func (tst *TimeTickMsg) Type() MsgType {
return tst.MsgType
}
func (tst *TimeTickMsg) Marshal(input *TsMsg) ([]byte, error) {
timeTickTask := (*input).(*TimeTickMsg)
func (tst *TimeTickMsg) Marshal(input TsMsg) ([]byte, error) {
timeTickTask := input.(*TimeTickMsg)
timeTick := &timeTickTask.TimeTickMsg
mb, err := proto.Marshal(timeTick)
if err != nil {
@ -212,7 +208,7 @@ func (tst *TimeTickMsg) Marshal(input *TsMsg) ([]byte, error) {
return mb, nil
}
func (tst *TimeTickMsg) Unmarshal(input []byte) (*TsMsg, error) {
func (tst *TimeTickMsg) Unmarshal(input []byte) (TsMsg, error) {
timeTickMsg := internalPb.TimeTickMsg{}
err := proto.Unmarshal(input, &timeTickMsg)
if err != nil {
@ -222,8 +218,7 @@ func (tst *TimeTickMsg) Unmarshal(input []byte) (*TsMsg, error) {
timeTick.BeginTimestamp = timeTick.Timestamp
timeTick.EndTimestamp = timeTick.Timestamp
var tsMsg TsMsg = timeTick
return &tsMsg, nil
return timeTick, nil
}
/////////////////////////////////////////QueryNodeSegStats//////////////////////////////////////////
@ -236,8 +231,8 @@ func (qs *QueryNodeSegStatsMsg) Type() MsgType {
return qs.MsgType
}
func (qs *QueryNodeSegStatsMsg) Marshal(input *TsMsg) ([]byte, error) {
queryNodeSegStatsTask := (*input).(*QueryNodeSegStatsMsg)
func (qs *QueryNodeSegStatsMsg) Marshal(input TsMsg) ([]byte, error) {
queryNodeSegStatsTask := input.(*QueryNodeSegStatsMsg)
queryNodeSegStats := &queryNodeSegStatsTask.QueryNodeSegStats
mb, err := proto.Marshal(queryNodeSegStats)
if err != nil {
@ -246,7 +241,7 @@ func (qs *QueryNodeSegStatsMsg) Marshal(input *TsMsg) ([]byte, error) {
return mb, nil
}
func (qs *QueryNodeSegStatsMsg) Unmarshal(input []byte) (*TsMsg, error) {
func (qs *QueryNodeSegStatsMsg) Unmarshal(input []byte) (TsMsg, error) {
queryNodeSegStats := internalPb.QueryNodeSegStats{}
err := proto.Unmarshal(input, &queryNodeSegStats)
if err != nil {
@ -254,8 +249,7 @@ func (qs *QueryNodeSegStatsMsg) Unmarshal(input []byte) (*TsMsg, error) {
}
queryNodeSegStatsMsg := &QueryNodeSegStatsMsg{QueryNodeSegStats: queryNodeSegStats}
var tsMsg TsMsg = queryNodeSegStatsMsg
return &tsMsg, nil
return queryNodeSegStatsMsg, nil
}
///////////////////////////////////////////Key2Seg//////////////////////////////////////////

View File

@ -18,8 +18,8 @@ type InsertTask struct {
InsertMsg
}
func (tt *InsertTask) Marshal(input *TsMsg) ([]byte, error) {
testMsg := (*input).(*InsertTask)
func (tt *InsertTask) Marshal(input TsMsg) ([]byte, error) {
testMsg := input.(*InsertTask)
insertRequest := &testMsg.InsertRequest
mb, err := proto.Marshal(insertRequest)
if err != nil {
@ -28,26 +28,25 @@ func (tt *InsertTask) Marshal(input *TsMsg) ([]byte, error) {
return mb, nil
}
func (tt *InsertTask) Unmarshal(input []byte) (*TsMsg, error) {
func (tt *InsertTask) Unmarshal(input []byte) (TsMsg, error) {
insertRequest := internalPb.InsertRequest{}
err := proto.Unmarshal(input, &insertRequest)
testMsg := &InsertTask{InsertMsg: InsertMsg{InsertRequest: insertRequest}}
testMsg.Tag = testMsg.PartitionTag
if err != nil {
return nil, err
}
var tsMsg TsMsg = testMsg
return &tsMsg, nil
return testMsg, nil
}
func newRepackFunc(tsMsgs []*TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error) {
func newRepackFunc(tsMsgs []TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, error) {
result := make(map[int32]*MsgPack)
for i, request := range tsMsgs {
if (*request).Type() != internalPb.MsgType_kInsert {
if request.Type() != internalPb.MsgType_kInsert {
return nil, errors.New(string("msg's must be Insert"))
}
insertRequest := (*request).(*InsertTask).InsertRequest
insertRequest := request.(*InsertTask).InsertRequest
keys := hashKeys[i]
timestampLen := len(insertRequest.Timestamps)
@ -78,18 +77,17 @@ func newRepackFunc(tsMsgs []*TsMsg, hashKeys [][]int32) (map[int32]*MsgPack, err
RowData: []*commonPb.Blob{insertRequest.RowData[index]},
}
var msg TsMsg = &InsertTask{
insertMsg := &InsertTask{
InsertMsg: InsertMsg{InsertRequest: sliceRequest},
}
result[key].Msgs = append(result[key].Msgs, &msg)
result[key].Msgs = append(result[key].Msgs, insertMsg)
}
}
return result, nil
}
func getMsg(reqID UniqueID, hashValue int32) *TsMsg {
var tsMsg TsMsg
func getInsertTask(reqID UniqueID, hashValue int32) TsMsg {
baseMsg := BaseMsg{
BeginTimestamp: 0,
EndTimestamp: 0,
@ -112,11 +110,11 @@ func getMsg(reqID UniqueID, hashValue int32) *TsMsg {
InsertRequest: insertRequest,
}
testTask := InsertTask{
testTask := &InsertTask{
InsertMsg: insertMsg,
}
tsMsg = &testTask
return &tsMsg
return testTask
}
func TestStream_task_Insert(t *testing.T) {
@ -126,8 +124,8 @@ func TestStream_task_Insert(t *testing.T) {
consumerSubName := "subInsert"
msgPack := MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, getMsg(1, 1))
msgPack.Msgs = append(msgPack.Msgs, getMsg(3, 3))
msgPack.Msgs = append(msgPack.Msgs, getInsertTask(1, 1))
msgPack.Msgs = append(msgPack.Msgs, getInsertTask(3, 3))
inputStream := NewPulsarMsgStream(context.Background(), 100)
inputStream.SetPulsarCient(pulsarAddress)
@ -154,7 +152,7 @@ func TestStream_task_Insert(t *testing.T) {
msgs := result.Msgs
for _, v := range msgs {
receiveCount++
fmt.Println("msg type: ", (*v).Type(), ", msg value: ", *v, "msg tag: ", (*v).(*InsertTask).Tag)
fmt.Println("msg type: ", v.Type(), ", msg value: ", v, "msg tag: ", v.(*InsertTask).Tag)
}
}
if receiveCount >= len(msgPack.Msgs) {

View File

@ -5,14 +5,14 @@ import (
internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
)
type MarshalFunc func(*TsMsg) ([]byte, error)
type UnmarshalFunc func([]byte) (*TsMsg, error)
type MarshalFunc func(TsMsg) ([]byte, error)
type UnmarshalFunc func([]byte) (TsMsg, error)
type UnmarshalDispatcher struct {
tempMap map[internalPb.MsgType]UnmarshalFunc
}
func (dispatcher *UnmarshalDispatcher) Unmarshal(input []byte, msgType internalPb.MsgType) (*TsMsg, error) {
func (dispatcher *UnmarshalDispatcher) Unmarshal(input []byte, msgType internalPb.MsgType) (TsMsg, error) {
unmarshalFunc, ok := dispatcher.tempMap[msgType]
if !ok {
return nil, errors.New(string("Not set unmarshalFunc for this messageType."))

View File

@ -10,17 +10,16 @@ import (
internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
)
func newInsertMsgUnmarshal(input []byte) (*TsMsg, error) {
func newInsertMsgUnmarshal(input []byte) (TsMsg, error) {
insertRequest := internalPb.InsertRequest{}
err := proto.Unmarshal(input, &insertRequest)
insertMsg := &InsertMsg{InsertRequest: insertRequest}
fmt.Println("use func newInsertMsgUnmarshal unmarshal")
if err != nil {
return nil, err
}
var tsMsg TsMsg = insertMsg
return &tsMsg, nil
return insertMsg, nil
}
func TestStream_unmarshal_Insert(t *testing.T) {
@ -59,7 +58,7 @@ func TestStream_unmarshal_Insert(t *testing.T) {
msgs := result.Msgs
for _, v := range msgs {
receiveCount++
fmt.Println("msg type: ", (*v).Type(), ", msg value: ", *v, "msg tag: ")
fmt.Println("msg type: ", v.Type(), ", msg value: ", v, "msg tag: ")
}
}
if receiveCount >= len(msgPack.Msgs) {

View File

@ -187,7 +187,7 @@ func (p *Proxy) queryResultLoop() {
continue
}
tsMsg := msgPack.Msgs[0]
searchResultMsg, _ := (*tsMsg).(*msgstream.SearchResultMsg)
searchResultMsg, _ := tsMsg.(*msgstream.SearchResultMsg)
reqID := searchResultMsg.GetReqID()
_, ok = queryResultBuf[reqID]
if !ok {

View File

@ -297,10 +297,9 @@ func TestProxy_Search(t *testing.T) {
},
}
msgPack := &msgstream.MsgPack{
Msgs: make([]*msgstream.TsMsg, 1),
Msgs: make([]msgstream.TsMsg, 1),
}
var tsMsg msgstream.TsMsg = searchResultMsg
msgPack.Msgs[0] = &tsMsg
msgPack.Msgs[0] = searchResultMsg
queryResultMsgStream.Produce(msgPack)
}
i++

View File

@ -66,9 +66,9 @@ func (it *InsertTask) Execute() error {
msgPack := &msgstream.MsgPack{
BeginTs: it.BeginTs(),
EndTs: it.EndTs(),
Msgs: make([]*msgstream.TsMsg, 1),
Msgs: make([]msgstream.TsMsg, 1),
}
msgPack.Msgs[0] = &tsMsg
msgPack.Msgs[0] = tsMsg
err := it.manipulationMsgStream.Produce(msgPack)
it.result = &servicepb.IntegerRangeResponse{
Status: &commonpb.Status{
@ -281,9 +281,9 @@ func (qt *QueryTask) Execute() error {
msgPack := &msgstream.MsgPack{
BeginTs: qt.Timestamp,
EndTs: qt.Timestamp,
Msgs: make([]*msgstream.TsMsg, 1),
Msgs: make([]msgstream.TsMsg, 1),
}
msgPack.Msgs[0] = &tsMsg
msgPack.Msgs[0] = tsMsg
qt.queryMsgStream.Produce(msgPack)
return nil
}

View File

@ -74,14 +74,14 @@ func (tt *timeTick) tick() error {
return nil
}
msgPack := msgstream.MsgPack{}
var timeTickMsg msgstream.TsMsg = &msgstream.TimeTickMsg{
timeTickMsg := &msgstream.TimeTickMsg{
TimeTickMsg: internalpb.TimeTickMsg{
MsgType: internalpb.MsgType_kTimeTick,
PeerID: tt.peerID,
Timestamp: tt.currentTick,
},
}
msgPack.Msgs = append(msgPack.Msgs, &timeTickMsg)
msgPack.Msgs = append(msgPack.Msgs, timeTickMsg)
tt.tickMsgStream.Produce(&msgPack)
tt.lastTick = tt.currentTick
return nil

View File

@ -121,7 +121,7 @@ func TestManipulationService_Start(t *testing.T) {
}
// messages generate
insertMessages := make([]*msgstream.TsMsg, 0)
insertMessages := make([]msgstream.TsMsg, 0)
for i := 0; i < msgLength; i++ {
var msg msgstream.TsMsg = &msgstream.InsertMsg{
BaseMsg: msgstream.BaseMsg{
@ -145,7 +145,7 @@ func TestManipulationService_Start(t *testing.T) {
},
},
}
insertMessages = append(insertMessages, &msg)
insertMessages = append(insertMessages, msg)
}
msgPack := msgstream.MsgPack{

View File

@ -38,14 +38,14 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg {
timestampMax: msMsg.TimestampMax(),
},
}
for _, msg := range *msMsg.TsMessages() {
switch (*msg).Type() {
for _, msg := range msMsg.TsMessages() {
switch msg.Type() {
case internalPb.MsgType_kInsert:
iMsg.insertMessages = append(iMsg.insertMessages, (*msg).(*msgstream.InsertMsg))
iMsg.insertMessages = append(iMsg.insertMessages, msg.(*msgstream.InsertMsg))
// case internalPb.MsgType_kDelete:
// dmMsg.deleteMessages = append(dmMsg.deleteMessages, (*msg).(*msgstream.DeleteTask))
default:
log.Println("Non supporting message type:", (*msg).Type())
log.Println("Non supporting message type:", msg.Type())
}
}

View File

@ -9,7 +9,7 @@ type Msg = flowgraph.Msg
type MsgStreamMsg = flowgraph.MsgStreamMsg
type key2SegMsg struct {
tsMessages []*msgstream.TsMsg
tsMessages []msgstream.TsMsg
timeRange TimeRange
}

View File

@ -28,7 +28,7 @@ func (plan *Plan) GetTopK() int64 {
return int64(topK)
}
func (plan *Plan) DeletePlan() {
func (plan *Plan) Delete() {
C.DeletePlan(plan.cPlan)
}
@ -49,6 +49,6 @@ func (pg *PlaceholderGroup) GetNumOfQuery() int64 {
return int64(numQueries)
}
func (pg *PlaceholderGroup) DeletePlaceholderGroup() {
func (pg *PlaceholderGroup) Delete() {
C.DeletePlaceholderGroup(pg.cPlaceholderGroup)
}

View File

@ -19,7 +19,7 @@ type searchService struct {
ctx context.Context
cancel context.CancelFunc
container container
node *QueryNode
searchMsgStream *msgstream.MsgStream
searchResultMsgStream *msgstream.MsgStream
}
@ -57,7 +57,7 @@ func newSearchService(ctx context.Context, node *QueryNode, pulsarURL string) *s
ctx: searchServiceCtx,
cancel: searchServiceCancel,
container: *node.container,
node: node,
searchMsgStream: &inputStream,
searchResultMsgStream: &outputStream,
}
@ -68,7 +68,7 @@ func (ss *searchService) start() {
(*ss.searchResultMsgStream).Start()
go func() {
for {
for {
select {
case <-ss.ctx.Done():
return
@ -95,7 +95,7 @@ func (ss *searchService) close() {
ss.cancel()
}
func (ss *searchService) search(searchMessages []*msgstream.TsMsg) error {
func (ss *searchService) search(searchMessages []msgstream.TsMsg) error {
type SearchResult struct {
ResultID int64
@ -104,9 +104,9 @@ func (ss *searchService) search(searchMessages []*msgstream.TsMsg) error {
// TODO:: cache map[dsl]plan
// TODO: reBatched search requests
for _, msg := range searchMessages {
searchMsg, ok := (*msg).(*msgstream.SearchMsg)
searchMsg, ok := msg.(*msgstream.SearchMsg)
if !ok {
return errors.New("invalid request type = " + string((*msg).Type()))
return errors.New("invalid request type = " + string(msg.Type()))
}
searchTimestamp := searchMsg.Timestamp
@ -120,7 +120,7 @@ func (ss *searchService) search(searchMessages []*msgstream.TsMsg) error {
}
collectionName := query.CollectionName
partitionTags := query.PartitionTags
collection, err := ss.container.getCollectionByName(collectionName)
collection, err := (*ss.node.container).getCollectionByName(collectionName)
if err != nil {
return err
}
@ -150,7 +150,7 @@ func (ss *searchService) search(searchMessages []*msgstream.TsMsg) error {
// 3. Do search in all segments
for _, partitionTag := range partitionTags {
partition, err := ss.container.getPartitionByTag(collectionID, partitionTag)
partition, err := (*ss.node.container).getPartitionByTag(collectionID, partitionTag)
if err != nil {
return err
}
@ -208,13 +208,15 @@ func (ss *searchService) search(searchMessages []*msgstream.TsMsg) error {
}
var tsMsg msgstream.TsMsg = &msgstream.SearchResultMsg{SearchResult: results}
ss.publishSearchResult(&tsMsg)
ss.publishSearchResult(tsMsg)
plan.Delete()
placeholderGroup.Delete()
}
return nil
}
func (ss *searchService) publishSearchResult(res *msgstream.TsMsg) {
func (ss *searchService) publishSearchResult(res msgstream.TsMsg) {
msgPack := msgstream.MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, res)
(*ss.searchResultMsgStream).Produce(&msgPack)
@ -228,6 +230,6 @@ func (ss *searchService) publishFailedSearchResult() {
var tsMsg msgstream.TsMsg = &msgstream.SearchResultMsg{SearchResult: errorResults}
msgPack := msgstream.MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, &tsMsg)
msgPack.Msgs = append(msgPack.Msgs, tsMsg)
(*ss.searchResultMsgStream).Produce(&msgPack)
}

View File

@ -115,7 +115,7 @@ func TestSearch_Search(t *testing.T) {
}
// messages generate
insertMessages := make([]*msgstream.TsMsg, 0)
insertMessages := make([]msgstream.TsMsg, 0)
for i := 0; i < msgLength; i++ {
var msg msgstream.TsMsg = &msgstream.InsertMsg{
BaseMsg: msgstream.BaseMsg{
@ -139,7 +139,7 @@ func TestSearch_Search(t *testing.T) {
},
},
}
insertMessages = append(insertMessages, &msg)
insertMessages = append(insertMessages, msg)
}
msgPack := msgstream.MsgPack{
@ -211,7 +211,7 @@ func TestSearch_Search(t *testing.T) {
Value: queryByte,
}
searchMsg := msgstream.SearchMsg{
searchMsg := &msgstream.SearchMsg{
BaseMsg: msgstream.BaseMsg{
HashValues: []int32{0},
},
@ -225,10 +225,8 @@ func TestSearch_Search(t *testing.T) {
},
}
var tsMsg msgstream.TsMsg = &searchMsg
msgPackSearch := msgstream.MsgPack{}
msgPackSearch.Msgs = append(msgPackSearch.Msgs, &tsMsg)
msgPackSearch.Msgs = append(msgPackSearch.Msgs, searchMsg)
var searchMsgStream msgstream.MsgStream = searchStream
searchMsgStream.Start()

View File

@ -78,7 +78,7 @@ func (sService *statsService) publicStatistic(statistic *internalpb.QueryNodeSeg
}
var msgPack = msgstream.MsgPack{
Msgs: []*msgstream.TsMsg{&msg},
Msgs: []msgstream.TsMsg{msg},
}
err := (*sService.msgStream).Produce(&msgPack)
if err != nil {

View File

@ -8,7 +8,7 @@ type Msg interface {
}
type MsgStreamMsg struct {
tsMessages []*msgstream.TsMsg
tsMessages []msgstream.TsMsg
timestampMin Timestamp
timestampMax Timestamp
}
@ -21,8 +21,8 @@ func (msMsg *MsgStreamMsg) DownStreamNodeIdx() int {
return 0
}
func (msMsg *MsgStreamMsg) TsMessages() *[]*msgstream.TsMsg {
return &msMsg.tsMessages
func (msMsg *MsgStreamMsg) TsMessages() []msgstream.TsMsg {
return msMsg.tsMessages
}
func (msMsg *MsgStreamMsg) TimestampMin() Timestamp {