Update doc: add proxy graph

Signed-off-by: XuanYang-cn <xuan.yang@zilliz.com>
pull/4973/head^2
XuanYang-cn 2021-02-24 09:48:17 +08:00 committed by yefu.chen
parent 3cc071c1b5
commit fd562f9f9c
38 changed files with 290 additions and 155 deletions

View File

@ -2,37 +2,17 @@ package components
import (
"context"
"fmt"
"io"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go/config"
grpcindexnode "github.com/zilliztech/milvus-distributed/internal/distributed/indexnode"
)
type IndexNode struct {
svr *grpcindexnode.Server
tracer opentracing.Tracer
closer io.Closer
}
func NewIndexNode(ctx context.Context) (*IndexNode, error) {
var err error
n := &IndexNode{}
cfg := &config.Configuration{
ServiceName: "indexnode",
Sampler: &config.SamplerConfig{
Type: "const",
Param: 1,
},
}
n.tracer, n.closer, err = cfg.NewTracer()
if err != nil {
panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
}
opentracing.SetGlobalTracer(n.tracer)
svr, err := grpcindexnode.NewServer(ctx)
if err != nil {
return nil, err

View File

@ -2,37 +2,17 @@ package components
import (
"context"
"fmt"
"io"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go/config"
grpcindexserver "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice"
)
type IndexService struct {
svr *grpcindexserver.Server
tracer opentracing.Tracer
closer io.Closer
}
func NewIndexService(ctx context.Context) (*IndexService, error) {
var err error
s := &IndexService{}
cfg := &config.Configuration{
ServiceName: "indexservice",
Sampler: &config.SamplerConfig{
Type: "const",
Param: 1,
},
}
s.tracer, s.closer, err = cfg.NewTracer()
if err != nil {
panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
}
opentracing.SetGlobalTracer(s.tracer)
svr, err := grpcindexserver.NewServer(ctx)
if err != nil {
@ -48,7 +28,6 @@ func (s *IndexService) Run() error {
return nil
}
func (s *IndexService) Stop() error {
s.closer.Close()
if err := s.svr.Stop(); err != nil {
return err
}

View File

@ -2,7 +2,9 @@ package components
import (
"context"
"io"
"github.com/opentracing/opentracing-go"
msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
)
@ -10,6 +12,9 @@ import (
type MasterService struct {
ctx context.Context
svr *msc.Server
tracer opentracing.Tracer
closer io.Closer
}
func NewMasterService(ctx context.Context, factory msgstream.Factory) (*MasterService, error) {
@ -18,7 +23,6 @@ func NewMasterService(ctx context.Context, factory msgstream.Factory) (*MasterSe
if err != nil {
return nil, err
}
return &MasterService{
ctx: ctx,
svr: svr,

View File

@ -3,9 +3,8 @@ package components
import (
"context"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
grpcproxynode "github.com/zilliztech/milvus-distributed/internal/distributed/proxynode"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
)
type ProxyNode struct {
@ -13,7 +12,9 @@ type ProxyNode struct {
}
func NewProxyNode(ctx context.Context, factory msgstream.Factory) (*ProxyNode, error) {
var err error
n := &ProxyNode{}
svr, err := grpcproxynode.NewServer(ctx, factory)
if err != nil {
return nil, err
@ -21,12 +22,14 @@ func NewProxyNode(ctx context.Context, factory msgstream.Factory) (*ProxyNode, e
n.svr = svr
return n, nil
}
func (n *ProxyNode) Run() error {
if err := n.svr.Run(); err != nil {
return err
}
return nil
}
func (n *ProxyNode) Stop() error {
if err := n.svr.Stop(); err != nil {
return err

View File

@ -3,9 +3,8 @@ package components
import (
"context"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
grpcproxyservice "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
)
type ProxyService struct {
@ -13,6 +12,7 @@ type ProxyService struct {
}
func NewProxyService(ctx context.Context, factory msgstream.Factory) (*ProxyService, error) {
var err error
service := &ProxyService{}
svr, err := grpcproxyservice.NewServer(ctx, factory)
if err != nil {
@ -21,12 +21,14 @@ func NewProxyService(ctx context.Context, factory msgstream.Factory) (*ProxyServ
service.svr = svr
return service, nil
}
func (s *ProxyService) Run() error {
if err := s.svr.Run(); err != nil {
return err
}
return nil
}
func (s *ProxyService) Stop() error {
if err := s.svr.Stop(); err != nil {
return err

View File

@ -2,7 +2,7 @@
<img src="./figs/proxy.jpeg" width=700>
<img src="./figs/proxy.png" width=700>

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.6 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 161 KiB

View File

@ -100,12 +100,12 @@ func TestDataSyncService_Start(t *testing.T) {
var ddMsgStream msgstream.MsgStream = ddStream
ddMsgStream.Start()
err = insertMsgStream.Produce(&msgPack)
err = insertMsgStream.Produce(ctx, &msgPack)
assert.NoError(t, err)
err = insertMsgStream.Broadcast(&timeTickMsgPack)
err = insertMsgStream.Broadcast(ctx, &timeTickMsgPack)
assert.NoError(t, err)
err = ddMsgStream.Broadcast(&timeTickMsgPack)
err = ddMsgStream.Broadcast(ctx, &timeTickMsgPack)
assert.NoError(t, err)
// dataSync

View File

@ -550,7 +550,7 @@ func (ibNode *insertBufferNode) completeFlush(segID UniqueID) error {
}
msgPack.Msgs = append(msgPack.Msgs, msg)
return ibNode.completeFlushStream.Produce(&msgPack)
return ibNode.completeFlushStream.Produce(context.TODO(), &msgPack)
}
func (ibNode *insertBufferNode) writeHardTimeTick(ts Timestamp) error {
@ -571,7 +571,7 @@ func (ibNode *insertBufferNode) writeHardTimeTick(ts Timestamp) error {
},
}
msgPack.Msgs = append(msgPack.Msgs, &timeTickMsg)
return ibNode.timeTickStream.Produce(&msgPack)
return ibNode.timeTickStream.Produce(context.TODO(), &msgPack)
}
func (ibNode *insertBufferNode) updateSegStatistics(segIDs []UniqueID, currentPosition *internalpb2.MsgPosition) error {
@ -608,7 +608,7 @@ func (ibNode *insertBufferNode) updateSegStatistics(segIDs []UniqueID, currentPo
var msgPack = msgstream.MsgPack{
Msgs: []msgstream.TsMsg{msg},
}
return ibNode.segmentStatisticsStream.Produce(&msgPack)
return ibNode.segmentStatisticsStream.Produce(context.TODO(), &msgPack)
}
func (ibNode *insertBufferNode) getCollectionSchemaByID(collectionID UniqueID) (*schemapb.CollectionSchema, error) {

View File

@ -619,7 +619,7 @@ func (s *Server) openNewSegment(collectionID UniqueID, partitionID UniqueID, cha
msgPack := &msgstream.MsgPack{
Msgs: []msgstream.TsMsg{infoMsg},
}
if err = s.segmentInfoStream.Produce(msgPack); err != nil {
if err = s.segmentInfoStream.Produce(s.ctx, msgPack); err != nil {
return err
}
return nil

View File

@ -2,6 +2,8 @@ package grpcdatanode
import (
"context"
"fmt"
"io"
"sync"
"time"
@ -11,6 +13,8 @@ import (
"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go/config"
dn "github.com/zilliztech/milvus-distributed/internal/datanode"
dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice/client"
msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice/client"
@ -35,6 +39,8 @@ type Server struct {
masterService *msc.GrpcClient
dataService *dsc.Client
closer io.Closer
}
func New(ctx context.Context, factory msgstream.Factory) (*Server, error) {
@ -101,13 +107,15 @@ func (s *Server) Run() error {
}
func (s *Server) Stop() error {
if err := s.closer.Close(); err != nil {
return err
}
s.cancel()
var err error
if s.grpcServer != nil {
s.grpcServer.GracefulStop()
}
err = s.impl.Stop()
err := s.impl.Stop()
if err != nil {
return err
}
@ -182,6 +190,21 @@ func (s *Server) init() error {
s.impl.NodeID = dn.Params.NodeID
s.impl.UpdateStateCode(internalpb2.StateCode_INITIALIZING)
// TODO
cfg := &config.Configuration{
ServiceName: fmt.Sprintf("data_node_%d", s.impl.NodeID),
Sampler: &config.SamplerConfig{
Type: "const",
Param: 1,
},
}
tracer, closer, err := cfg.NewTracer()
if err != nil {
panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
}
opentracing.SetGlobalTracer(tracer)
s.closer = closer
if err := s.impl.Init(); err != nil {
log.Println("impl init error: ", err)
return err

View File

@ -2,6 +2,8 @@ package grpcdataserviceclient
import (
"context"
"fmt"
"io"
"log"
"net"
"strconv"
@ -12,6 +14,8 @@ import (
msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice/client"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go/config"
"github.com/zilliztech/milvus-distributed/internal/dataservice"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
@ -31,10 +35,12 @@ type Server struct {
impl *dataservice.Server
grpcServer *grpc.Server
masterClient *msc.GrpcClient
closer io.Closer
}
func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) {
var err error
ctx1, cancel := context.WithCancel(ctx)
s := &Server{
@ -43,7 +49,21 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error)
grpcErrChan: make(chan error),
}
var err error
// TODO
cfg := &config.Configuration{
ServiceName: "data_service",
Sampler: &config.SamplerConfig{
Type: "const",
Param: 1,
},
}
tracer, closer, err := cfg.NewTracer()
if err != nil {
panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
}
opentracing.SetGlobalTracer(tracer)
s.closer = closer
s.impl, err = dataservice.CreateServer(s.ctx, factory)
if err != nil {
return nil, err
@ -120,9 +140,11 @@ func (s *Server) start() error {
}
func (s *Server) Stop() error {
s.cancel()
var err error
if err = s.closer.Close(); err != nil {
return err
}
s.cancel()
if s.grpcServer != nil {
s.grpcServer.GracefulStop()

View File

@ -2,6 +2,8 @@ package grpcmasterservice
import (
"context"
"fmt"
"io"
"log"
"strconv"
"time"
@ -15,6 +17,8 @@ import (
qsc "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice/client"
"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go/config"
cms "github.com/zilliztech/milvus-distributed/internal/masterservice"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
@ -44,6 +48,8 @@ type Server struct {
connectDataService bool
connectIndexService bool
connectQueryService bool
closer io.Closer
}
func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) {
@ -60,7 +66,21 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error)
connectQueryService: true,
}
var err error
//TODO
cfg := &config.Configuration{
ServiceName: "proxy_service",
Sampler: &config.SamplerConfig{
Type: "const",
Param: 1,
},
}
tracer, closer, err := cfg.NewTracer()
if err != nil {
panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
}
opentracing.SetGlobalTracer(tracer)
s.closer = closer
s.core, err = cms.NewCore(s.ctx, factory)
if err != nil {
return nil, err
@ -81,6 +101,7 @@ func (s *Server) Run() error {
func (s *Server) init() error {
Params.Init()
log.Println("init params done")
err := s.startGrpc()
@ -202,6 +223,9 @@ func (s *Server) start() error {
}
func (s *Server) Stop() error {
if err := s.closer.Close(); err != nil {
return err
}
if s.proxyService != nil {
_ = s.proxyService.Stop()
}

View File

@ -2,7 +2,6 @@ package grpcproxynode
import (
"context"
"fmt"
"io"
"log"
"net"
@ -19,7 +18,6 @@ import (
grpcqueryserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice/client"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go/config"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
@ -55,19 +53,6 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error)
grpcErrChan: make(chan error),
}
cfg := &config.Configuration{
ServiceName: "proxynode",
Sampler: &config.SamplerConfig{
Type: "const",
Param: 1,
},
}
server.tracer, server.closer, err = cfg.NewTracer()
if err != nil {
panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
}
opentracing.SetGlobalTracer(server.tracer)
server.impl, err = proxynode.NewProxyNodeImpl(server.ctx, factory)
if err != nil {
return nil, err

View File

@ -45,8 +45,9 @@ func NewServer(ctx1 context.Context, factory msgstream.Factory) (*Server, error)
grpcErrChan: make(chan error),
}
// TODO
cfg := &config.Configuration{
ServiceName: "proxyservice",
ServiceName: "proxy_service",
Sampler: &config.SamplerConfig{
Type: "const",
Param: 1,
@ -133,6 +134,9 @@ func (s *Server) start() error {
}
func (s *Server) Stop() error {
if err := s.closer.Close(); err != nil {
return err
}
s.cancel()
s.closer.Close()
err := s.impl.Stop()

View File

@ -487,10 +487,10 @@ func (c *Core) setMsgStreams() error {
TimeTickMsg: timeTickResult,
}
msgPack.Msgs = append(msgPack.Msgs, timeTickMsg)
if err := timeTickStream.Broadcast(&msgPack); err != nil {
if err := timeTickStream.Broadcast(c.ctx, &msgPack); err != nil {
return err
}
if err := ddStream.Broadcast(&msgPack); err != nil {
if err := ddStream.Broadcast(c.ctx, &msgPack); err != nil {
return err
}
return nil
@ -508,7 +508,7 @@ func (c *Core) setMsgStreams() error {
CreateCollectionRequest: *req,
}
msgPack.Msgs = append(msgPack.Msgs, collMsg)
if err := ddStream.Broadcast(&msgPack); err != nil {
if err := ddStream.Broadcast(c.ctx, &msgPack); err != nil {
return err
}
return nil
@ -526,7 +526,7 @@ func (c *Core) setMsgStreams() error {
DropCollectionRequest: *req,
}
msgPack.Msgs = append(msgPack.Msgs, collMsg)
if err := ddStream.Broadcast(&msgPack); err != nil {
if err := ddStream.Broadcast(c.ctx, &msgPack); err != nil {
return err
}
return nil
@ -544,7 +544,7 @@ func (c *Core) setMsgStreams() error {
CreatePartitionRequest: *req,
}
msgPack.Msgs = append(msgPack.Msgs, collMsg)
if err := ddStream.Broadcast(&msgPack); err != nil {
if err := ddStream.Broadcast(c.ctx, &msgPack); err != nil {
return err
}
return nil
@ -562,7 +562,7 @@ func (c *Core) setMsgStreams() error {
DropPartitionRequest: *req,
}
msgPack.Msgs = append(msgPack.Msgs, collMsg)
if err := ddStream.Broadcast(&msgPack); err != nil {
if err := ddStream.Broadcast(c.ctx, &msgPack); err != nil {
return err
}
return nil

View File

@ -253,7 +253,7 @@ func TestMasterService(t *testing.T) {
TimeTickMsg: timeTickResult,
}
msgPack.Msgs = append(msgPack.Msgs, timeTickMsg)
err := proxyTimeTickStream.Broadcast(&msgPack)
err := proxyTimeTickStream.Broadcast(ctx, &msgPack)
assert.Nil(t, err)
ttmsg, ok := <-timeTickStream.Chan()
@ -561,7 +561,7 @@ func TestMasterService(t *testing.T) {
},
}
msgPack.Msgs = append(msgPack.Msgs, segMsg)
err = dataServiceSegmentStream.Broadcast(&msgPack)
err = dataServiceSegmentStream.Broadcast(ctx, &msgPack)
assert.Nil(t, err)
time.Sleep(time.Second)
@ -703,7 +703,7 @@ func TestMasterService(t *testing.T) {
},
}
msgPack.Msgs = append(msgPack.Msgs, segMsg)
err = dataServiceSegmentStream.Broadcast(&msgPack)
err = dataServiceSegmentStream.Broadcast(ctx, &msgPack)
assert.Nil(t, err)
time.Sleep(time.Second)
@ -724,7 +724,7 @@ func TestMasterService(t *testing.T) {
},
}
msgPack.Msgs = []ms.TsMsg{flushMsg}
err = dataServiceSegmentStream.Broadcast(&msgPack)
err = dataServiceSegmentStream.Broadcast(ctx, &msgPack)
assert.Nil(t, err)
time.Sleep(time.Second)

View File

@ -30,8 +30,8 @@ type MsgStream interface {
AsConsumer(channels []string, subName string)
SetRepackFunc(repackFunc RepackFunc)
Produce(*MsgPack) error
Broadcast(*MsgPack) error
Produce(context.Context, *MsgPack) error
Broadcast(context.Context, *MsgPack) error
Consume() *MsgPack
Seek(offset *MsgPosition) error
}

View File

@ -131,6 +131,7 @@ func getInsertTask(reqID msgstream.UniqueID, hashValue uint32) msgstream.TsMsg {
}
func TestStream_task_Insert(t *testing.T) {
ctx := context.Background()
pulsarAddress, _ := Params.Load("_PulsarAddress")
producerChannels := []string{"insert1", "insert2"}
consumerChannels := []string{"insert1", "insert2"}
@ -153,7 +154,7 @@ func TestStream_task_Insert(t *testing.T) {
outputStream.AsConsumer(consumerChannels, consumerSubName)
outputStream.Start()
err := inputStream.Produce(&msgPack)
err := inputStream.Produce(ctx, &msgPack)
if err != nil {
log.Fatalf("produce error = %v", err)
}

View File

@ -19,6 +19,7 @@ import (
"github.com/zilliztech/milvus-distributed/internal/msgstream/util"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/util/trace"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
@ -204,7 +205,7 @@ func (ppRW *propertiesReaderWriter) ForeachKey(handler func(key, val string) err
return nil
}
func (ms *PulsarMsgStream) Produce(msgPack *MsgPack) error {
func (ms *PulsarMsgStream) Produce(ctx context.Context, msgPack *MsgPack) error {
tsMsgs := msgPack.Msgs
if len(tsMsgs) <= 0 {
log.Debug("Warning: Receive empty msgPack")
@ -263,20 +264,26 @@ func (ms *PulsarMsgStream) Produce(msgPack *MsgPack) error {
return err
}
msg := &pulsar.ProducerMessage{Payload: m}
msg := &pulsar.ProducerMessage{Payload: m, Properties: map[string]string{}}
sp, spanCtx := trace.MsgSpanFromCtx(ctx, v.Msgs[i])
trace.InjectContextToPulsarMsgProperties(sp.Context(), msg.Properties)
if _, err := ms.producers[k].Send(
context.Background(),
spanCtx,
msg,
); err != nil {
trace.LogError(sp, err)
sp.Finish()
return err
}
sp.Finish()
}
}
return nil
}
func (ms *PulsarMsgStream) Broadcast(msgPack *MsgPack) error {
func (ms *PulsarMsgStream) Broadcast(ctx context.Context, msgPack *MsgPack) error {
producerLen := len(ms.producers)
for _, v := range msgPack.Msgs {
mb, err := v.Marshal(v)
@ -289,15 +296,22 @@ func (ms *PulsarMsgStream) Broadcast(msgPack *MsgPack) error {
return err
}
msg := &pulsar.ProducerMessage{Payload: m}
msg := &pulsar.ProducerMessage{Payload: m, Properties: map[string]string{}}
sp, spanCtx := trace.MsgSpanFromCtx(ctx, v)
trace.InjectContextToPulsarMsgProperties(sp.Context(), msg.Properties)
for i := 0; i < producerLen; i++ {
if _, err := ms.producers[i].Send(
context.Background(),
spanCtx,
msg,
); err != nil {
trace.LogError(sp, err)
sp.Finish()
return err
}
}
sp.Finish()
}
return nil
}

View File

@ -238,6 +238,7 @@ func receiveMsg(outputStream msgstream.MsgStream, msgCount int) {
}
func TestStream_PulsarMsgStream_Insert(t *testing.T) {
ctx := context.Background()
pulsarAddress, _ := Params.Load("_PulsarAddress")
c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)
producerChannels := []string{c1, c2}
@ -249,7 +250,7 @@ func TestStream_PulsarMsgStream_Insert(t *testing.T) {
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kInsert, 3, 3))
inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
err := inputStream.Produce(&msgPack)
err := inputStream.Produce(ctx, &msgPack)
if err != nil {
log.Fatalf("produce error = %v", err)
}
@ -261,6 +262,7 @@ func TestStream_PulsarMsgStream_Insert(t *testing.T) {
}
func TestStream_PulsarMsgStream_Delete(t *testing.T) {
ctx := context.Background()
pulsarAddress, _ := Params.Load("_PulsarAddress")
c := funcutil.RandomString(8)
producerChannels := []string{c}
@ -271,7 +273,7 @@ func TestStream_PulsarMsgStream_Delete(t *testing.T) {
//msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kDelete, 3, 3))
inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
err := inputStream.Produce(&msgPack)
err := inputStream.Produce(ctx, &msgPack)
if err != nil {
log.Fatalf("produce error = %v", err)
}
@ -281,6 +283,7 @@ func TestStream_PulsarMsgStream_Delete(t *testing.T) {
}
func TestStream_PulsarMsgStream_Search(t *testing.T) {
ctx := context.Background()
pulsarAddress, _ := Params.Load("_PulsarAddress")
c := funcutil.RandomString(8)
producerChannels := []string{c}
@ -292,7 +295,7 @@ func TestStream_PulsarMsgStream_Search(t *testing.T) {
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kSearch, 3, 3))
inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
err := inputStream.Produce(&msgPack)
err := inputStream.Produce(ctx, &msgPack)
if err != nil {
log.Fatalf("produce error = %v", err)
}
@ -302,6 +305,7 @@ func TestStream_PulsarMsgStream_Search(t *testing.T) {
}
func TestStream_PulsarMsgStream_SearchResult(t *testing.T) {
ctx := context.Background()
pulsarAddress, _ := Params.Load("_PulsarAddress")
c := funcutil.RandomString(8)
producerChannels := []string{c}
@ -312,7 +316,7 @@ func TestStream_PulsarMsgStream_SearchResult(t *testing.T) {
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kSearchResult, 3, 3))
inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
err := inputStream.Produce(&msgPack)
err := inputStream.Produce(ctx, &msgPack)
if err != nil {
log.Fatalf("produce error = %v", err)
}
@ -322,6 +326,7 @@ func TestStream_PulsarMsgStream_SearchResult(t *testing.T) {
}
func TestStream_PulsarMsgStream_TimeTick(t *testing.T) {
ctx := context.Background()
pulsarAddress, _ := Params.Load("_PulsarAddress")
c := funcutil.RandomString(8)
producerChannels := []string{c}
@ -332,7 +337,7 @@ func TestStream_PulsarMsgStream_TimeTick(t *testing.T) {
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kTimeTick, 3, 3))
inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
err := inputStream.Produce(&msgPack)
err := inputStream.Produce(ctx, &msgPack)
if err != nil {
log.Fatalf("produce error = %v", err)
}
@ -342,6 +347,7 @@ func TestStream_PulsarMsgStream_TimeTick(t *testing.T) {
}
func TestStream_PulsarMsgStream_BroadCast(t *testing.T) {
ctx := context.Background()
pulsarAddress, _ := Params.Load("_PulsarAddress")
c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)
producerChannels := []string{c1, c2}
@ -353,7 +359,7 @@ func TestStream_PulsarMsgStream_BroadCast(t *testing.T) {
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kTimeTick, 3, 3))
inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
err := inputStream.Broadcast(&msgPack)
err := inputStream.Broadcast(ctx, &msgPack)
if err != nil {
log.Fatalf("produce error = %v", err)
}
@ -363,6 +369,7 @@ func TestStream_PulsarMsgStream_BroadCast(t *testing.T) {
}
func TestStream_PulsarMsgStream_RepackFunc(t *testing.T) {
ctx := context.Background()
pulsarAddress, _ := Params.Load("_PulsarAddress")
c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)
producerChannels := []string{c1, c2}
@ -374,7 +381,7 @@ func TestStream_PulsarMsgStream_RepackFunc(t *testing.T) {
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(commonpb.MsgType_kInsert, 3, 3))
inputStream, outputStream := initPulsarStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName, repackFunc)
err := inputStream.Produce(&msgPack)
err := inputStream.Produce(ctx, &msgPack)
if err != nil {
log.Fatalf("produce error = %v", err)
}
@ -384,6 +391,7 @@ func TestStream_PulsarMsgStream_RepackFunc(t *testing.T) {
}
func TestStream_PulsarMsgStream_InsertRepackFunc(t *testing.T) {
ctx := context.Background()
pulsarAddress, _ := Params.Load("_PulsarAddress")
c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)
producerChannels := []string{c1, c2}
@ -428,7 +436,7 @@ func TestStream_PulsarMsgStream_InsertRepackFunc(t *testing.T) {
outputStream.Start()
var output msgstream.MsgStream = outputStream
err := (*inputStream).Produce(&msgPack)
err := (*inputStream).Produce(ctx, &msgPack)
if err != nil {
log.Fatalf("produce error = %v", err)
}
@ -438,6 +446,7 @@ func TestStream_PulsarMsgStream_InsertRepackFunc(t *testing.T) {
}
func TestStream_PulsarMsgStream_DeleteRepackFunc(t *testing.T) {
ctx := context.Background()
pulsarAddress, _ := Params.Load("_PulsarAddress")
c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)
producerChannels := []string{c1, c2}
@ -480,7 +489,7 @@ func TestStream_PulsarMsgStream_DeleteRepackFunc(t *testing.T) {
outputStream.Start()
var output msgstream.MsgStream = outputStream
err := (*inputStream).Produce(&msgPack)
err := (*inputStream).Produce(ctx, &msgPack)
if err != nil {
log.Fatalf("produce error = %v", err)
}
@ -490,6 +499,7 @@ func TestStream_PulsarMsgStream_DeleteRepackFunc(t *testing.T) {
}
func TestStream_PulsarMsgStream_DefaultRepackFunc(t *testing.T) {
ctx := context.Background()
pulsarAddress, _ := Params.Load("_PulsarAddress")
c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)
producerChannels := []string{c1, c2}
@ -512,7 +522,7 @@ func TestStream_PulsarMsgStream_DefaultRepackFunc(t *testing.T) {
outputStream.Start()
var output msgstream.MsgStream = outputStream
err := (*inputStream).Produce(&msgPack)
err := (*inputStream).Produce(ctx, &msgPack)
if err != nil {
log.Fatalf("produce error = %v", err)
}
@ -522,6 +532,7 @@ func TestStream_PulsarMsgStream_DefaultRepackFunc(t *testing.T) {
}
func TestStream_PulsarTtMsgStream_Insert(t *testing.T) {
ctx := context.Background()
pulsarAddress, _ := Params.Load("_PulsarAddress")
c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)
producerChannels := []string{c1, c2}
@ -538,15 +549,15 @@ func TestStream_PulsarTtMsgStream_Insert(t *testing.T) {
msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(5, 5, 5))
inputStream, outputStream := initPulsarTtStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
err := inputStream.Broadcast(&msgPack0)
err := inputStream.Broadcast(ctx, &msgPack0)
if err != nil {
log.Fatalf("broadcast error = %v", err)
}
err = inputStream.Produce(&msgPack1)
err = inputStream.Produce(ctx, &msgPack1)
if err != nil {
log.Fatalf("produce error = %v", err)
}
err = inputStream.Broadcast(&msgPack2)
err = inputStream.Broadcast(ctx, &msgPack2)
if err != nil {
log.Fatalf("broadcast error = %v", err)
}
@ -556,6 +567,7 @@ func TestStream_PulsarTtMsgStream_Insert(t *testing.T) {
}
func TestStream_PulsarTtMsgStream_Seek(t *testing.T) {
ctx := context.Background()
pulsarAddress, _ := Params.Load("_PulsarAddress")
c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)
producerChannels := []string{c1, c2}
@ -583,15 +595,15 @@ func TestStream_PulsarTtMsgStream_Seek(t *testing.T) {
msgPack5.Msgs = append(msgPack5.Msgs, getTimeTickMsg(15, 15, 15))
inputStream, outputStream := initPulsarTtStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
err := inputStream.Broadcast(&msgPack0)
err := inputStream.Broadcast(ctx, &msgPack0)
assert.Nil(t, err)
err = inputStream.Produce(&msgPack1)
err = inputStream.Produce(ctx, &msgPack1)
assert.Nil(t, err)
err = inputStream.Broadcast(&msgPack2)
err = inputStream.Broadcast(ctx, &msgPack2)
assert.Nil(t, err)
err = inputStream.Produce(&msgPack3)
err = inputStream.Produce(ctx, &msgPack3)
assert.Nil(t, err)
err = inputStream.Broadcast(&msgPack4)
err = inputStream.Broadcast(ctx, &msgPack4)
assert.Nil(t, err)
outputStream.Consume()
@ -599,7 +611,7 @@ func TestStream_PulsarTtMsgStream_Seek(t *testing.T) {
for _, position := range receivedMsg.StartPositions {
outputStream.Seek(position)
}
err = inputStream.Broadcast(&msgPack5)
err = inputStream.Broadcast(ctx, &msgPack5)
assert.Nil(t, err)
seekMsg := outputStream.Consume()
for _, msg := range seekMsg.Msgs {
@ -610,6 +622,7 @@ func TestStream_PulsarTtMsgStream_Seek(t *testing.T) {
}
func TestStream_PulsarTtMsgStream_UnMarshalHeader(t *testing.T) {
ctx := context.Background()
pulsarAddress, _ := Params.Load("_PulsarAddress")
c1, c2 := funcutil.RandomString(8), funcutil.RandomString(8)
producerChannels := []string{c1, c2}
@ -627,15 +640,15 @@ func TestStream_PulsarTtMsgStream_UnMarshalHeader(t *testing.T) {
msgPack2.Msgs = append(msgPack2.Msgs, getTimeTickMsg(5, 5, 5))
inputStream, outputStream := initPulsarTtStream(pulsarAddress, producerChannels, consumerChannels, consumerSubName)
err := inputStream.Broadcast(&msgPack0)
err := inputStream.Broadcast(ctx, &msgPack0)
if err != nil {
log.Fatalf("broadcast error = %v", err)
}
err = inputStream.Produce(&msgPack1)
err = inputStream.Produce(ctx, &msgPack1)
if err != nil {
log.Fatalf("produce error = %v", err)
}
err = inputStream.Broadcast(&msgPack2)
err = inputStream.Broadcast(ctx, &msgPack2)
if err != nil {
log.Fatalf("broadcast error = %v", err)
}

View File

@ -125,7 +125,7 @@ func (ms *RmqMsgStream) AsConsumer(channels []string, groupName string) {
}
}
func (ms *RmqMsgStream) Produce(pack *msgstream.MsgPack) error {
func (ms *RmqMsgStream) Produce(ctx context.Context, pack *msgstream.MsgPack) error {
tsMsgs := pack.Msgs
if len(tsMsgs) <= 0 {
log.Printf("Warning: Receive empty msgPack")
@ -185,6 +185,7 @@ func (ms *RmqMsgStream) Produce(pack *msgstream.MsgPack) error {
}
msg := make([]rocksmq.ProducerMessage, 0)
msg = append(msg, *rocksmq.NewProducerMessage(m))
if err := rocksmq.Rmq.Produce(ms.producers[k], msg); err != nil {
return err
}
@ -193,7 +194,7 @@ func (ms *RmqMsgStream) Produce(pack *msgstream.MsgPack) error {
return nil
}
func (ms *RmqMsgStream) Broadcast(msgPack *MsgPack) error {
func (ms *RmqMsgStream) Broadcast(ctx context.Context, msgPack *MsgPack) error {
producerLen := len(ms.producers)
for _, v := range msgPack.Msgs {
mb, err := v.Marshal(v)

View File

@ -252,6 +252,7 @@ func receiveMsg(outputStream msgstream.MsgStream, msgCount int) {
}
func TestStream_RmqMsgStream_Insert(t *testing.T) {
ctx := context.Background()
producerChannels := []string{"insert1", "insert2"}
consumerChannels := []string{"insert1", "insert2"}
consumerGroupName := "InsertGroup"
@ -263,7 +264,7 @@ func TestStream_RmqMsgStream_Insert(t *testing.T) {
rocksdbName := "/tmp/rocksmq_insert"
etcdKV := initRmq(rocksdbName)
inputStream, outputStream := initRmqStream(producerChannels, consumerChannels, consumerGroupName)
err := inputStream.Produce(&msgPack)
err := inputStream.Produce(ctx, &msgPack)
if err != nil {
log.Fatalf("produce error = %v", err)
}
@ -273,6 +274,7 @@ func TestStream_RmqMsgStream_Insert(t *testing.T) {
}
func TestStream_RmqMsgStream_Delete(t *testing.T) {
ctx := context.Background()
producerChannels := []string{"delete"}
consumerChannels := []string{"delete"}
consumerSubName := "subDelete"
@ -283,7 +285,7 @@ func TestStream_RmqMsgStream_Delete(t *testing.T) {
rocksdbName := "/tmp/rocksmq_delete"
etcdKV := initRmq(rocksdbName)
inputStream, outputStream := initRmqStream(producerChannels, consumerChannels, consumerSubName)
err := inputStream.Produce(&msgPack)
err := inputStream.Produce(ctx, &msgPack)
if err != nil {
log.Fatalf("produce error = %v", err)
}
@ -292,6 +294,7 @@ func TestStream_RmqMsgStream_Delete(t *testing.T) {
}
func TestStream_RmqMsgStream_Search(t *testing.T) {
ctx := context.Background()
producerChannels := []string{"search"}
consumerChannels := []string{"search"}
consumerSubName := "subSearch"
@ -303,7 +306,7 @@ func TestStream_RmqMsgStream_Search(t *testing.T) {
rocksdbName := "/tmp/rocksmq_search"
etcdKV := initRmq(rocksdbName)
inputStream, outputStream := initRmqStream(producerChannels, consumerChannels, consumerSubName)
err := inputStream.Produce(&msgPack)
err := inputStream.Produce(ctx, &msgPack)
if err != nil {
log.Fatalf("produce error = %v", err)
}
@ -312,6 +315,8 @@ func TestStream_RmqMsgStream_Search(t *testing.T) {
}
func TestStream_RmqMsgStream_SearchResult(t *testing.T) {
ctx := context.Background()
producerChannels := []string{"searchResult"}
consumerChannels := []string{"searchResult"}
consumerSubName := "subSearchResult"
@ -323,7 +328,7 @@ func TestStream_RmqMsgStream_SearchResult(t *testing.T) {
rocksdbName := "/tmp/rocksmq_searchresult"
etcdKV := initRmq(rocksdbName)
inputStream, outputStream := initRmqStream(producerChannels, consumerChannels, consumerSubName)
err := inputStream.Produce(&msgPack)
err := inputStream.Produce(ctx, &msgPack)
if err != nil {
log.Fatalf("produce error = %v", err)
}
@ -332,6 +337,7 @@ func TestStream_RmqMsgStream_SearchResult(t *testing.T) {
}
func TestStream_RmqMsgStream_TimeTick(t *testing.T) {
ctx := context.Background()
producerChannels := []string{"timeTick"}
consumerChannels := []string{"timeTick"}
consumerSubName := "subTimeTick"
@ -343,7 +349,7 @@ func TestStream_RmqMsgStream_TimeTick(t *testing.T) {
rocksdbName := "/tmp/rocksmq_timetick"
etcdKV := initRmq(rocksdbName)
inputStream, outputStream := initRmqStream(producerChannels, consumerChannels, consumerSubName)
err := inputStream.Produce(&msgPack)
err := inputStream.Produce(ctx, &msgPack)
if err != nil {
log.Fatalf("produce error = %v", err)
}
@ -352,6 +358,7 @@ func TestStream_RmqMsgStream_TimeTick(t *testing.T) {
}
func TestStream_RmqMsgStream_BroadCast(t *testing.T) {
ctx := context.Background()
producerChannels := []string{"insert1", "insert2"}
consumerChannels := []string{"insert1", "insert2"}
consumerSubName := "subInsert"
@ -363,7 +370,7 @@ func TestStream_RmqMsgStream_BroadCast(t *testing.T) {
rocksdbName := "/tmp/rocksmq_broadcast"
etcdKV := initRmq(rocksdbName)
inputStream, outputStream := initRmqStream(producerChannels, consumerChannels, consumerSubName)
err := inputStream.Broadcast(&msgPack)
err := inputStream.Broadcast(ctx, &msgPack)
if err != nil {
log.Fatalf("produce error = %v", err)
}
@ -372,6 +379,8 @@ func TestStream_RmqMsgStream_BroadCast(t *testing.T) {
}
func TestStream_RmqMsgStream_RepackFunc(t *testing.T) {
ctx := context.Background()
producerChannels := []string{"insert1", "insert2"}
consumerChannels := []string{"insert1", "insert2"}
consumerSubName := "subInsert"
@ -383,7 +392,7 @@ func TestStream_RmqMsgStream_RepackFunc(t *testing.T) {
rocksdbName := "/tmp/rocksmq_repackfunc"
etcdKV := initRmq(rocksdbName)
inputStream, outputStream := initRmqStream(producerChannels, consumerChannels, consumerSubName, repackFunc)
err := inputStream.Produce(&msgPack)
err := inputStream.Produce(ctx, &msgPack)
if err != nil {
log.Fatalf("produce error = %v", err)
}
@ -392,6 +401,8 @@ func TestStream_RmqMsgStream_RepackFunc(t *testing.T) {
}
func TestStream_PulsarTtMsgStream_Insert(t *testing.T) {
ctx := context.Background()
producerChannels := []string{"insert1", "insert2"}
consumerChannels := []string{"insert1", "insert2"}
consumerSubName := "subInsert"
@ -410,15 +421,15 @@ func TestStream_PulsarTtMsgStream_Insert(t *testing.T) {
etcdKV := initRmq(rocksdbName)
inputStream, outputStream := initRmqTtStream(producerChannels, consumerChannels, consumerSubName)
err := inputStream.Broadcast(&msgPack0)
err := inputStream.Broadcast(ctx, &msgPack0)
if err != nil {
log.Fatalf("broadcast error = %v", err)
}
err = inputStream.Produce(&msgPack1)
err = inputStream.Produce(ctx, &msgPack1)
if err != nil {
log.Fatalf("produce error = %v", err)
}
err = inputStream.Broadcast(&msgPack2)
err = inputStream.Broadcast(ctx, &msgPack2)
if err != nil {
log.Fatalf("broadcast error = %v", err)
}

View File

@ -12,6 +12,9 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/proxypb"
"github.com/zilliztech/milvus-distributed/internal/util/retry"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go/config"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
@ -130,6 +133,21 @@ func (node *NodeImpl) Init() error {
return err
}
// TODO
cfg := &config.Configuration{
ServiceName: fmt.Sprintf("proxy_node_%d", Params.ProxyID),
Sampler: &config.SamplerConfig{
Type: "const",
Param: 1,
},
}
tracer, closer, err := cfg.NewTracer()
if err != nil {
panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
}
opentracing.SetGlobalTracer(tracer)
node.closer = closer
// wait for dataservice state changed to Healthy
if node.dataServiceClient != nil {
err = node.waitForServiceReady(node.dataServiceClient, "DataService")
@ -270,6 +288,9 @@ func (node *NodeImpl) Start() error {
}
func (node *NodeImpl) Stop() error {
if err := node.closer.Close(); err != nil {
return err
}
node.cancel()
globalInsertChannelsMap.closeAllMsgStream()

View File

@ -224,7 +224,7 @@ func (it *InsertTask) Execute(ctx context.Context) error {
return err
}
err = stream.Produce(msgPack)
err = stream.Produce(ctx, msgPack)
if err != nil {
it.result.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR
it.result.Status.Reason = err.Error()
@ -580,7 +580,7 @@ func (st *SearchTask) Execute(ctx context.Context) error {
Msgs: make([]msgstream.TsMsg, 1),
}
msgPack.Msgs[0] = tsMsg
err := st.queryMsgStream.Produce(msgPack)
err := st.queryMsgStream.Produce(ctx, msgPack)
log.Printf("[NodeImpl] length of searchMsg: %v", len(msgPack.Msgs))
if err != nil {
log.Printf("[NodeImpl] send search request failed: %v", err)

View File

@ -85,7 +85,7 @@ func (tt *timeTick) tick() error {
},
}
msgPack.Msgs = append(msgPack.Msgs, timeTickMsg)
err := tt.tickMsgStream.Produce(&msgPack)
err := tt.tickMsgStream.Produce(tt.ctx, &msgPack)
if err != nil {
log.Printf("proxynode send time tick error: %v", err)
} else {

View File

@ -59,7 +59,7 @@ func (tt *TimeTickImpl) Start() error {
}
msgPack.Msgs = append(msgPack.Msgs, timeTickMsg)
for _, channel := range tt.channels {
err = channel.Broadcast(&msgPack)
err = channel.Broadcast(tt.ctx, &msgPack)
if err != nil {
log.Println("send time tick error: ", err)
}

View File

@ -1,6 +1,7 @@
package querynode
import (
"context"
"encoding/binary"
"math"
"testing"
@ -15,6 +16,8 @@ import (
// NOTE: start pulsar before test
func TestDataSyncService_Start(t *testing.T) {
ctx := context.Background()
node := newQueryNodeMock()
initTestMeta(t, node, 0, 0)
// test data generate
@ -129,12 +132,12 @@ func TestDataSyncService_Start(t *testing.T) {
var ddMsgStream msgstream.MsgStream = ddStream
ddMsgStream.Start()
err = insertMsgStream.Produce(&msgPack)
err = insertMsgStream.Produce(ctx, &msgPack)
assert.NoError(t, err)
err = insertMsgStream.Broadcast(&timeTickMsgPack)
err = insertMsgStream.Broadcast(ctx, &timeTickMsgPack)
assert.NoError(t, err)
err = ddMsgStream.Broadcast(&timeTickMsgPack)
err = ddMsgStream.Broadcast(ctx, &timeTickMsgPack)
assert.NoError(t, err)
// dataSync

View File

@ -66,7 +66,7 @@ func (stNode *serviceTimeNode) sendTimeTick(ts Timestamp) error {
},
}
msgPack.Msgs = append(msgPack.Msgs, &timeTickMsg)
return stNode.timeTickMsgStream.Produce(&msgPack)
return stNode.timeTickMsgStream.Produce(context.TODO(), &msgPack)
}
func newServiceTimeNode(ctx context.Context, replica collectionReplica, factory msgstream.Factory) *serviceTimeNode {

View File

@ -1038,16 +1038,16 @@ func doInsert(ctx context.Context, collectionID UniqueID, partitionID UniqueID,
var ddMsgStream msgstream.MsgStream = ddStream
ddMsgStream.Start()
err = insertMsgStream.Produce(&msgPack)
err = insertMsgStream.Produce(ctx, &msgPack)
if err != nil {
return err
}
err = insertMsgStream.Broadcast(&timeTickMsgPack)
err = insertMsgStream.Broadcast(ctx, &timeTickMsgPack)
if err != nil {
return err
}
err = ddMsgStream.Broadcast(&timeTickMsgPack)
err = ddMsgStream.Broadcast(ctx, &timeTickMsgPack)
if err != nil {
return err
}
@ -1104,11 +1104,11 @@ func sentTimeTick(ctx context.Context) error {
var ddMsgStream msgstream.MsgStream = ddStream
ddMsgStream.Start()
err = insertMsgStream.Broadcast(&timeTickMsgPack)
err = insertMsgStream.Broadcast(ctx, &timeTickMsgPack)
if err != nil {
return err
}
err = ddMsgStream.Broadcast(&timeTickMsgPack)
err = ddMsgStream.Broadcast(ctx, &timeTickMsgPack)
if err != nil {
return err
}

View File

@ -19,6 +19,8 @@ import (
"log"
"sync/atomic"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go/config"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
@ -87,6 +89,20 @@ func NewQueryNode(ctx context.Context, queryNodeID UniqueID, factory msgstream.F
msFactory: factory,
}
cfg := &config.Configuration{
ServiceName: fmt.Sprintf("query_node_%d", node.QueryNodeID),
Sampler: &config.SamplerConfig{
Type: "const",
Param: 1,
},
}
tracer, closer, err := cfg.NewTracer()
if err != nil {
panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
}
opentracing.SetGlobalTracer(tracer)
node.closer = closer
node.replica = newCollectionReplicaImpl()
node.UpdateStateCode(internalpb2.StateCode_ABNORMAL)
return node
@ -194,6 +210,9 @@ func (node *QueryNode) Start() error {
}
func (node *QueryNode) Stop() error {
if err := node.closer.Close(); err != nil {
return err
}
node.UpdateStateCode(internalpb2.StateCode_ABNORMAL)
node.queryNodeLoopCancel()

View File

@ -398,7 +398,7 @@ func (ss *searchService) publishSearchResult(msg msgstream.TsMsg) error {
// msg.SetMsgContext(ctx)
msgPack := msgstream.MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, msg)
err := ss.searchResultMsgStream.Produce(&msgPack)
err := ss.searchResultMsgStream.Produce(context.TODO(), &msgPack)
return err
}
@ -430,7 +430,7 @@ func (ss *searchService) publishFailedSearchResult(msg msgstream.TsMsg, errMsg s
}
msgPack.Msgs = append(msgPack.Msgs, searchResultMsg)
err := ss.searchResultMsgStream.Produce(&msgPack)
err := ss.searchResultMsgStream.Produce(context.TODO(), &msgPack)
if err != nil {
return err
}

View File

@ -19,6 +19,8 @@ import (
)
func TestSearch_Search(t *testing.T) {
ctx := context.Background()
node := newQueryNodeMock()
initTestMeta(t, node, 0, 0)
@ -104,7 +106,7 @@ func TestSearch_Search(t *testing.T) {
searchStream, _ := msFactory.NewMsgStream(node.queryNodeLoopCtx)
searchStream.AsProducer(searchProducerChannels)
searchStream.Start()
err = searchStream.Produce(&msgPackSearch)
err = searchStream.Produce(ctx, &msgPackSearch)
assert.NoError(t, err)
node.searchService = newSearchService(node.queryNodeLoopCtx, node.replica, msFactory)
@ -198,12 +200,12 @@ func TestSearch_Search(t *testing.T) {
var ddMsgStream msgstream.MsgStream = ddStream
ddMsgStream.Start()
err = insertMsgStream.Produce(&msgPack)
err = insertMsgStream.Produce(ctx, &msgPack)
assert.NoError(t, err)
err = insertMsgStream.Broadcast(&timeTickMsgPack)
err = insertMsgStream.Broadcast(ctx, &timeTickMsgPack)
assert.NoError(t, err)
err = ddMsgStream.Broadcast(&timeTickMsgPack)
err = ddMsgStream.Broadcast(ctx, &timeTickMsgPack)
assert.NoError(t, err)
// dataSync
@ -216,6 +218,7 @@ func TestSearch_Search(t *testing.T) {
}
func TestSearch_SearchMultiSegments(t *testing.T) {
ctx := context.Background()
pulsarURL := Params.PulsarAddress
const receiveBufSize = 1024
@ -301,7 +304,7 @@ func TestSearch_SearchMultiSegments(t *testing.T) {
searchStream, _ := msFactory.NewMsgStream(node.queryNodeLoopCtx)
searchStream.AsProducer(searchProducerChannels)
searchStream.Start()
err = searchStream.Produce(&msgPackSearch)
err = searchStream.Produce(ctx, &msgPackSearch)
assert.NoError(t, err)
node.searchService = newSearchService(node.queryNodeLoopCtx, node.replica, msFactory)
@ -399,12 +402,12 @@ func TestSearch_SearchMultiSegments(t *testing.T) {
var ddMsgStream msgstream.MsgStream = ddStream
ddMsgStream.Start()
err = insertMsgStream.Produce(&msgPack)
err = insertMsgStream.Produce(ctx, &msgPack)
assert.NoError(t, err)
err = insertMsgStream.Broadcast(&timeTickMsgPack)
err = insertMsgStream.Broadcast(ctx, &timeTickMsgPack)
assert.NoError(t, err)
err = ddMsgStream.Broadcast(&timeTickMsgPack)
err = ddMsgStream.Broadcast(ctx, &timeTickMsgPack)
assert.NoError(t, err)
// dataSync

View File

@ -92,7 +92,7 @@ func (sService *statsService) publicStatistic(fieldStats []*internalpb2.FieldSta
var msgPack = msgstream.MsgPack{
Msgs: []msgstream.TsMsg{msg},
}
err := sService.statsStream.Produce(&msgPack)
err := sService.statsStream.Produce(context.TODO(), &msgPack)
if err != nil {
log.Println(err)
}

View File

@ -3,12 +3,15 @@ package queryservice
import (
"context"
"fmt"
"io"
"sort"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go/config"
nodeclient "github.com/zilliztech/milvus-distributed/internal/distributed/querynode/client"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
@ -64,6 +67,8 @@ type QueryService struct {
enableGrpc bool
msFactory msgstream.Factory
closer io.Closer
}
func (qs *QueryService) Init() error {
@ -76,6 +81,9 @@ func (qs *QueryService) Start() error {
}
func (qs *QueryService) Stop() error {
if err := qs.closer.Close(); err != nil {
return err
}
qs.loopCancel()
qs.UpdateStateCode(internalpb2.StateCode_ABNORMAL)
return nil
@ -615,6 +623,21 @@ func NewQueryService(ctx context.Context, factory msgstream.Factory) (*QueryServ
qcMutex: &sync.Mutex{},
msFactory: factory,
}
cfg := &config.Configuration{
ServiceName: "query_service",
Sampler: &config.SamplerConfig{
Type: "const",
Param: 1,
},
}
tracer, closer, err := cfg.NewTracer()
if err != nil {
panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
}
opentracing.SetGlobalTracer(tracer)
service.closer = closer
service.UpdateStateCode(internalpb2.StateCode_ABNORMAL)
return service, nil
}

View File

@ -39,7 +39,7 @@ func (watcher *MsgTimeTickWatcher) StartBackgroundLoop(ctx context.Context) {
msgPack := &ms.MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, msg)
for _, stream := range watcher.streams {
if err := stream.Broadcast(msgPack); err != nil {
if err := stream.Broadcast(ctx, msgPack); err != nil {
log.Printf("stream broadcast failed %s", err.Error())
}
}