Add publishRetrieveResult (#5501)

Signed-off-by: fishpenguin <kun.yu@zilliz.com>
pull/5516/head
yukun 2021-05-31 18:08:32 +08:00 committed by GitHub
parent 918458a1be
commit 48f35ffa97
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 51 additions and 1 deletions

View File

@ -57,7 +57,8 @@ type QueryNode struct {
streaming *streaming
// internal services
searchService *searchService
searchService *searchService
retrieveService *retrieveService
// clients
masterService types.MasterService
@ -79,6 +80,7 @@ func NewQueryNode(ctx context.Context, queryNodeID UniqueID, factory msgstream.F
queryNodeLoopCancel: cancel,
QueryNodeID: queryNodeID,
searchService: nil,
retrieveService: nil,
msFactory: factory,
}
@ -94,6 +96,7 @@ func NewQueryNodeWithoutID(ctx context.Context, factory msgstream.Factory) *Quer
queryNodeLoopCtx: ctx1,
queryNodeLoopCancel: cancel,
searchService: nil,
retrieveService: nil,
msFactory: factory,
}
@ -191,11 +194,19 @@ func (node *QueryNode) Start() error {
node.streaming.tSafeReplica,
node.msFactory)
node.retrieveService = newRetrieveService(node.queryNodeLoopCtx,
node.historical.replica,
node.streaming.replica,
node.streaming.tSafeReplica,
node.msFactory,
)
// start task scheduler
go node.scheduler.Start()
// start services
go node.searchService.start()
go node.retrieveService.start()
go node.historical.start()
node.UpdateStateCode(internalpb.StateCode_Healthy)
return nil
@ -215,6 +226,9 @@ func (node *QueryNode) Stop() error {
if node.searchService != nil {
node.searchService.close()
}
if node.retrieveService != nil {
node.retrieveService.close()
}
return nil
}

View File

@ -237,9 +237,45 @@ func (rc *retrieveCollection) doUnsolvedMsgRetrieve() {
func (rc *retrieveCollection) retrieve(retrieveMsg *msgstream.RetrieveMsg) error {
// TODO(yukun)
retrieveResultMsg := &msgstream.RetrieveResultMsg{
BaseMsg: msgstream.BaseMsg{Ctx: retrieveMsg.Ctx},
RetrieveResults: internalpb.RetrieveResults{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_RetrieveResult,
MsgID: retrieveMsg.Base.MsgID,
SourceID: retrieveMsg.Base.SourceID,
},
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
ResultChannelID: retrieveMsg.ResultChannelID,
},
}
err := rc.publishRetrieveResult(retrieveResultMsg, retrieveMsg.CollectionID)
if err != nil {
return err
}
return nil
}
func (rc *retrieveCollection) publishRetrieveResult(msg msgstream.TsMsg, collectionID UniqueID) error {
log.Debug("publishing retrieve result...",
zap.Int64("msgID", msg.ID()),
zap.Int64("collectionID", collectionID))
span, ctx := trace.StartSpanFromContext(msg.TraceCtx())
defer span.Finish()
msg.SetTraceCtx(ctx)
msgPack := msgstream.MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, msg)
err := rc.retrieveResultMsgStream.Produce(&msgPack)
if err != nil {
log.Error(err.Error())
} else {
log.Debug("publish retrieve result done",
zap.Int64("msgID", msg.ID()),
zap.Int64("collectionID", collectionID))
}
return err
}
func (rc *retrieveCollection) publishFailedRetrieveResult(retrieveMsg *msgstream.RetrieveMsg, errMsg string) error {
span, ctx := trace.StartSpanFromContext(retrieveMsg.TraceCtx())
defer span.Finish()