Release flowgraph properly

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/4973/head^2
bigsheeper 2021-03-22 12:49:50 -05:00 committed by yefu.chen
parent 3be6dde459
commit 1634d75980
19 changed files with 178 additions and 218 deletions

View File

@ -81,6 +81,10 @@ func (ddNode *ddNode) Operate(ctx context.Context, in []Msg) ([]Msg, context.Con
// TODO: add error handling
}
if msMsg == nil {
return []Msg{}, ctx
}
ddNode.ddMsg = &ddMsg{
collectionRecords: make(map[UniqueID][]*metaOperateRecord),
partitionRecords: make(map[UniqueID][]*metaOperateRecord),

View File

@ -40,6 +40,10 @@ func (fdmNode *filterDmNode) Operate(ctx context.Context, in []Msg) ([]Msg, cont
// TODO: add error handling
}
if msgStreamMsg == nil || ddMsg == nil {
return []Msg{}, ctx
}
fdmNode.ddMsg = ddMsg
var iMsg = insertMsg{

View File

@ -30,6 +30,10 @@ func (gcNode *gcNode) Operate(ctx context.Context, in []Msg) ([]Msg, context.Con
// TODO: add error handling
}
if gcMsg == nil {
return []Msg{}, ctx
}
// drop collections
for _, collectionID := range gcMsg.gcRecord.collections {
err := gcNode.replica.removeCollection(collectionID)

View File

@ -100,6 +100,10 @@ func (ibNode *insertBufferNode) Operate(ctx context.Context, in []Msg) ([]Msg, c
// TODO: add error handling
}
if iMsg == nil {
return []Msg{}, ctx
}
// Updating segment statistics
uniqueSeg := make(map[UniqueID]int64)
for _, msg := range iMsg.insertMessages {

View File

@ -13,7 +13,6 @@ import (
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go/config"
"go.uber.org/zap"
"google.golang.org/grpc"
@ -29,6 +28,7 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/types"
"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
"github.com/zilliztech/milvus-distributed/internal/util/trace"
)
type Server struct {
@ -143,18 +143,15 @@ func (s *Server) init() error {
Params.LoadFromEnv()
Params.LoadFromArgs()
dn.Params.Init()
dn.Params.Port = Params.Port
dn.Params.IP = Params.IP
log.Debug("DataNode port", zap.Int("port", Params.Port))
// TODO
cfg := &config.Configuration{
ServiceName: fmt.Sprintf("data_node ip: %s, port: %d", Params.IP, Params.Port),
Sampler: &config.SamplerConfig{
Type: "const",
Param: 1,
},
}
tracer, closer, err := cfg.NewTracer()
tracer, closer, err := trace.InitTracing(fmt.Sprintf("data_node ip: %s, port: %d", Params.IP, Params.Port))
if err != nil {
panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
log.Error("data_node", zap.String("init trace err", err.Error()))
}
opentracing.SetGlobalTracer(tracer)
s.closer = closer
@ -210,10 +207,6 @@ func (s *Server) init() error {
panic(err)
}
dn.Params.Init()
dn.Params.Port = Params.Port
dn.Params.IP = Params.IP
s.datanode.NodeID = dn.Params.NodeID
s.datanode.UpdateStateCode(internalpb.StateCode_Initializing)

View File

@ -2,7 +2,6 @@ package grpcdataserviceclient
import (
"context"
"fmt"
"io"
"math"
"net"
@ -18,13 +17,13 @@ import (
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go/config"
"github.com/zilliztech/milvus-distributed/internal/dataservice"
msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice/client"
"github.com/zilliztech/milvus-distributed/internal/log"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/types"
"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
"github.com/zilliztech/milvus-distributed/internal/util/trace"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
@ -56,21 +55,6 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error)
grpcErrChan: make(chan error),
}
// TODO
cfg := &config.Configuration{
ServiceName: "data_service",
Sampler: &config.SamplerConfig{
Type: "const",
Param: 1,
},
}
tracer, closer, err := cfg.NewTracer()
if err != nil {
panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
}
opentracing.SetGlobalTracer(tracer)
s.closer = closer
s.dataService, err = dataservice.CreateServer(s.ctx, factory)
if err != nil {
return nil, err
@ -82,6 +66,13 @@ func (s *Server) init() error {
Params.Init()
Params.LoadFromEnv()
tracer, closer, err := trace.InitTracing("data_service")
if err != nil {
log.Error("data_service", zap.String("init trace err", err.Error()))
}
opentracing.SetGlobalTracer(tracer)
s.closer = closer
s.wg.Add(1)
go s.startGrpcLoop(Params.Port)
// wait for grpc server loop start

View File

@ -2,6 +2,8 @@ package grpcindexnode
import (
"context"
"fmt"
"io"
"math"
"net"
"strconv"
@ -20,6 +22,7 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/types"
"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
"github.com/zilliztech/milvus-distributed/internal/util/trace"
"google.golang.org/grpc"
)
@ -33,6 +36,8 @@ type Server struct {
loopCtx context.Context
loopCancel func()
loopWg sync.WaitGroup
closer io.Closer
}
func (s *Server) Run() error {
@ -87,6 +92,18 @@ func (s *Server) init() error {
Params.LoadFromEnv()
Params.LoadFromArgs()
indexnode.Params.Init()
indexnode.Params.Port = Params.Port
indexnode.Params.IP = Params.IP
indexnode.Params.Address = Params.Address
tracer, closer, err := trace.InitTracing(fmt.Sprintf("index_node_%d", indexnode.Params.NodeID))
if err != nil {
log.Error("index_node", zap.String("init trace err", err.Error()))
}
opentracing.SetGlobalTracer(tracer)
s.closer = closer
Params.Address = Params.IP + ":" + strconv.FormatInt(int64(Params.Port), 10)
defer func() {
@ -114,11 +131,6 @@ func (s *Server) init() error {
}
s.indexnode.SetIndexServiceClient(s.indexServiceClient)
indexnode.Params.Init()
indexnode.Params.Port = Params.Port
indexnode.Params.IP = Params.IP
indexnode.Params.Address = Params.Address
s.indexnode.UpdateStateCode(internalpb.StateCode_Initializing)
err = s.indexnode.Init()
@ -137,6 +149,9 @@ func (s *Server) start() error {
}
func (s *Server) Stop() error {
if err := s.closer.Close(); err != nil {
return err
}
s.loopCancel()
if s.indexnode != nil {
s.indexnode.Stop()

View File

@ -2,7 +2,6 @@ package grpcindexservice
import (
"context"
"fmt"
"io"
"math"
"net"
@ -13,7 +12,6 @@ import (
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go/config"
"github.com/zilliztech/milvus-distributed/internal/indexservice"
"github.com/zilliztech/milvus-distributed/internal/log"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
@ -21,6 +19,7 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
"github.com/zilliztech/milvus-distributed/internal/util/trace"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"google.golang.org/grpc"
)
@ -57,6 +56,13 @@ func (s *Server) init() error {
Params.Init()
indexservice.Params.Init()
tracer, closer, err := trace.InitTracing("index_service")
if err != nil {
log.Error("index_service", zap.String("init trace err", err.Error()))
}
opentracing.SetGlobalTracer(tracer)
s.closer = closer
s.loopWg.Add(1)
go s.startGrpcLoop(Params.ServicePort)
// wait for grpc indexservice loop start
@ -131,7 +137,6 @@ func (s *Server) GetIndexFilePaths(ctx context.Context, req *indexpb.GetIndexFil
func (s *Server) NotifyBuildIndex(ctx context.Context, nty *indexpb.NotifyBuildIndexRequest) (*commonpb.Status, error) {
return s.indexservice.NotifyBuildIndex(ctx, nty)
}
func (s *Server) startGrpcLoop(grpcPort int) {
defer s.loopWg.Done()
@ -178,19 +183,5 @@ func NewServer(ctx context.Context) (*Server, error) {
grpcErrChan: make(chan error),
}
cfg := &config.Configuration{
ServiceName: "index_service",
Sampler: &config.SamplerConfig{
Type: "const",
Param: 1,
},
}
tracer, closer, err := cfg.NewTracer()
if err != nil {
panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
}
opentracing.SetGlobalTracer(tracer)
s.closer = closer
return s, nil
}

View File

@ -2,7 +2,6 @@ package grpcmasterservice
import (
"context"
"fmt"
"io"
"math"
"net"
@ -12,7 +11,6 @@ import (
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go/config"
"go.uber.org/zap"
"google.golang.org/grpc"
@ -24,6 +22,7 @@ import (
cms "github.com/zilliztech/milvus-distributed/internal/masterservice"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/types"
"github.com/zilliztech/milvus-distributed/internal/util/trace"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
@ -70,21 +69,7 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error)
connectQueryService: true,
}
//TODO
cfg := &config.Configuration{
ServiceName: "master_service",
Sampler: &config.SamplerConfig{
Type: "const",
Param: 1,
},
}
tracer, closer, err := cfg.NewTracer()
if err != nil {
panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
}
opentracing.SetGlobalTracer(tracer)
s.closer = closer
var err error
s.masterService, err = cms.NewCore(s.ctx, factory)
if err != nil {
return nil, err
@ -105,11 +90,22 @@ func (s *Server) Run() error {
func (s *Server) init() error {
Params.Init()
cms.Params.Init()
log.Debug("grpc init done ...")
ctx := context.Background()
tracer, closer, err := trace.InitTracing("master_service")
if err != nil {
log.Error("master_service", zap.String("init trace err", err.Error()))
}
opentracing.SetGlobalTracer(tracer)
s.closer = closer
log.Debug("init params done")
err := s.startGrpc()
err = s.startGrpc()
if err != nil {
return err
}
@ -177,9 +173,6 @@ func (s *Server) init() error {
panic(err)
}
}
cms.Params.Init()
log.Debug("grpc init done ...")
if err := s.masterService.Init(); err != nil {
return err
}
@ -235,8 +228,10 @@ func (s *Server) start() error {
}
func (s *Server) Stop() error {
if err := s.closer.Close(); err != nil {
return err
if s.closer != nil {
if err := s.closer.Close(); err != nil {
return err
}
}
if s.proxyService != nil {
_ = s.proxyService.Stop()

View File

@ -21,7 +21,6 @@ import (
grpcqueryserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice/client"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go/config"
"github.com/zilliztech/milvus-distributed/internal/log"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
@ -30,6 +29,7 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/proxypb"
"github.com/zilliztech/milvus-distributed/internal/proxynode"
"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
"github.com/zilliztech/milvus-distributed/internal/util/trace"
)
const (
@ -126,28 +126,27 @@ func (s *Server) init() error {
}
Params.LoadFromEnv()
Params.LoadFromArgs()
Params.Address = Params.IP + ":" + strconv.FormatInt(int64(Params.Port), 10)
proxynode.Params.Init()
log.Debug("init params done ...")
proxynode.Params.NetworkPort = Params.Port
proxynode.Params.IP = Params.IP
proxynode.Params.NetworkAddress = Params.Address
// for purpose of ID Allocator
proxynode.Params.MasterAddress = Params.MasterAddress
tracer, closer, err := trace.InitTracing(fmt.Sprintf("proxy_node ip: %s, port: %d", Params.IP, Params.Port))
if err != nil {
log.Error("proxy_node", zap.String("init trace err", err.Error()))
}
opentracing.SetGlobalTracer(tracer)
s.closer = closer
log.Debug("proxynode", zap.String("proxy host", Params.IP))
log.Debug("proxynode", zap.Int("proxy port", Params.Port))
log.Debug("proxynode", zap.String("proxy address", Params.Address))
// TODO
cfg := &config.Configuration{
ServiceName: fmt.Sprintf("proxy_node ip: %s, port: %d", Params.IP, Params.Port),
Sampler: &config.SamplerConfig{
Type: "const",
Param: 1,
},
}
tracer, closer, err := cfg.NewTracer()
if err != nil {
panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
}
opentracing.SetGlobalTracer(tracer)
s.closer = closer
defer func() {
if err != nil {
err2 := s.Stop()
@ -226,14 +225,6 @@ func (s *Server) init() error {
s.proxynode.SetQueryServiceClient(s.queryServiceClient)
log.Debug("set query service client ...")
proxynode.Params.Init()
log.Debug("init params done ...")
proxynode.Params.NetworkPort = Params.Port
proxynode.Params.IP = Params.IP
proxynode.Params.NetworkAddress = Params.Address
// for purpose of ID Allocator
proxynode.Params.MasterAddress = Params.MasterAddress
s.proxynode.UpdateStateCode(internalpb.StateCode_Initializing)
log.Debug("proxynode",
zap.Any("state of proxynode", internalpb.StateCode_Initializing))

View File

@ -2,7 +2,6 @@ package grpcproxyservice
import (
"context"
"fmt"
"io"
"math"
"net"
@ -13,7 +12,6 @@ import (
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go/config"
"github.com/zilliztech/milvus-distributed/internal/log"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
@ -22,6 +20,7 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/proxypb"
"github.com/zilliztech/milvus-distributed/internal/proxyservice"
"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
"github.com/zilliztech/milvus-distributed/internal/util/trace"
"google.golang.org/grpc"
)
@ -49,20 +48,6 @@ func NewServer(ctx1 context.Context, factory msgstream.Factory) (*Server, error)
grpcErrChan: make(chan error),
}
// TODO
cfg := &config.Configuration{
ServiceName: "proxy_service",
Sampler: &config.SamplerConfig{
Type: "const",
Param: 1,
},
}
server.tracer, server.closer, err = cfg.NewTracer()
if err != nil {
panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
}
opentracing.SetGlobalTracer(server.tracer)
server.proxyservice, err = proxyservice.NewProxyService(server.ctx, factory)
if err != nil {
return nil, err
@ -88,6 +73,13 @@ func (s *Server) init() error {
proxyservice.Params.Init()
log.Debug("init params done")
tracer, closer, err := trace.InitTracing("proxy_service")
if err != nil {
log.Error("proxy_service", zap.String("init trace err", err.Error()))
}
opentracing.SetGlobalTracer(tracer)
s.closer = closer
s.wg.Add(1)
go s.startGrpcLoop(Params.ServicePort)
// wait for grpc server loop start

View File

@ -16,7 +16,6 @@ import (
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go/config"
"go.uber.org/zap"
"google.golang.org/grpc"
@ -32,6 +31,7 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/querypb"
qn "github.com/zilliztech/milvus-distributed/internal/querynode"
"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
"github.com/zilliztech/milvus-distributed/internal/util/trace"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
@ -72,17 +72,14 @@ func (s *Server) init() error {
Params.LoadFromEnv()
Params.LoadFromArgs()
// TODO
cfg := &config.Configuration{
ServiceName: fmt.Sprintf("query_node ip: %s, port: %d", Params.QueryNodeIP, Params.QueryNodePort),
Sampler: &config.SamplerConfig{
Type: "const",
Param: 1,
},
}
tracer, closer, err := cfg.NewTracer()
qn.Params.Init()
qn.Params.QueryNodeIP = Params.QueryNodeIP
qn.Params.QueryNodePort = int64(Params.QueryNodePort)
qn.Params.QueryNodeID = Params.QueryNodeID
tracer, closer, err := trace.InitTracing(fmt.Sprintf("query_node ip: %s, port: %d", Params.QueryNodeIP, Params.QueryNodePort))
if err != nil {
panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
log.Error("query_node", zap.String("init trace err", err.Error()))
}
opentracing.SetGlobalTracer(tracer)
s.closer = closer
@ -189,11 +186,6 @@ func (s *Server) init() error {
panic(err)
}
qn.Params.Init()
qn.Params.QueryNodeIP = Params.QueryNodeIP
qn.Params.QueryNodePort = int64(Params.QueryNodePort)
qn.Params.QueryNodeID = Params.QueryNodeID
s.querynode.UpdateStateCode(internalpb.StateCode_Initializing)
if err := s.querynode.Init(); err != nil {

View File

@ -2,6 +2,7 @@ package grpcqueryservice
import (
"context"
"io"
"math"
"net"
"strconv"
@ -17,6 +18,7 @@ import (
qs "github.com/zilliztech/milvus-distributed/internal/queryservice"
"github.com/zilliztech/milvus-distributed/internal/types"
"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
"github.com/zilliztech/milvus-distributed/internal/util/trace"
"go.uber.org/zap"
"google.golang.org/grpc"
@ -40,6 +42,8 @@ type Server struct {
dataService *dsc.Client
masterService *msc.GrpcClient
closer io.Closer
}
func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) {
@ -75,6 +79,14 @@ func (s *Server) Run() error {
func (s *Server) init() error {
ctx := context.Background()
Params.Init()
qs.Params.Init()
tracer, closer, err := trace.InitTracing("query_service")
if err != nil {
log.Error("query_service", zap.String("init trace err", err.Error()))
}
opentracing.SetGlobalTracer(tracer)
s.closer = closer
s.wg.Add(1)
go s.startGrpcLoop(Params.Port)
@ -129,7 +141,6 @@ func (s *Server) init() error {
panic(err)
}
qs.Params.Init()
s.queryservice.UpdateStateCode(internalpb.StateCode_Initializing)
if err := s.queryservice.Init(); err != nil {
@ -174,7 +185,11 @@ func (s *Server) start() error {
}
func (s *Server) Stop() error {
err := s.queryservice.Stop()
err := s.closer.Close()
if err != nil {
return err
}
err = s.queryservice.Stop()
s.loopCancel()
if s.grpcServer != nil {
s.grpcServer.GracefulStop()

View File

@ -3,15 +3,12 @@ package indexnode
import (
"context"
"errors"
"fmt"
"io"
"math/rand"
"time"
"go.uber.org/zap"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go/config"
"github.com/zilliztech/milvus-distributed/internal/kv"
miniokv "github.com/zilliztech/milvus-distributed/internal/kv/minio"
"github.com/zilliztech/milvus-distributed/internal/log"
@ -95,21 +92,6 @@ func (i *IndexNode) Init() error {
return err
}
// TODO
cfg := &config.Configuration{
ServiceName: fmt.Sprintf("index_node_%d", Params.NodeID),
Sampler: &config.SamplerConfig{
Type: "const",
Param: 1,
},
}
tracer, closer, err := cfg.NewTracer()
if err != nil {
panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
}
opentracing.SetGlobalTracer(tracer)
i.closer = closer
option := &miniokv.Option{
Address: Params.MinIOAddress,
AccessKeyID: Params.MinIOAccessKeyID,
@ -140,9 +122,6 @@ func (i *IndexNode) Start() error {
// Close closes the server.
func (i *IndexNode) Stop() error {
if err := i.closer.Close(); err != nil {
return err
}
i.loopCancel()
if i.sched != nil {
i.sched.Close()

View File

@ -44,9 +44,9 @@ func (stNode *serviceTimeNode) Operate(ctx context.Context, in []Msg) ([]Msg, co
ts := stNode.replica.getTSafe(stNode.collectionID)
if ts != nil {
ts.set(serviceTimeMsg.timeRange.timestampMax)
log.Debug("update tSafe:",
zap.Int64("tSafe", int64(serviceTimeMsg.timeRange.timestampMax)),
zap.Int64("collectionID", stNode.collectionID))
//log.Debug("update tSafe:",
// zap.Int64("tSafe", int64(serviceTimeMsg.timeRange.timestampMax)),
// zap.Int64("collectionID", stNode.collectionID))
}
if err := stNode.sendTimeTick(serviceTimeMsg.timeRange.timestampMax); err != nil {

View File

@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"io"
"math/rand"
"sort"
"strconv"
@ -12,8 +11,6 @@ import (
"sync/atomic"
"time"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go/config"
"go.uber.org/zap"
nodeclient "github.com/zilliztech/milvus-distributed/internal/distributed/querynode/client"
@ -51,8 +48,6 @@ type QueryService struct {
enableGrpc bool
msFactory msgstream.Factory
closer io.Closer
}
func (qs *QueryService) Init() error {
@ -65,9 +60,6 @@ func (qs *QueryService) Start() error {
}
func (qs *QueryService) Stop() error {
if err := qs.closer.Close(); err != nil {
return err
}
qs.loopCancel()
qs.UpdateStateCode(internalpb.StateCode_Abnormal)
return nil
@ -677,20 +669,6 @@ func NewQueryService(ctx context.Context, factory msgstream.Factory) (*QueryServ
msFactory: factory,
}
cfg := &config.Configuration{
ServiceName: "query_service",
Sampler: &config.SamplerConfig{
Type: "const",
Param: 1,
},
}
tracer, closer, err := cfg.NewTracer()
if err != nil {
panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
}
opentracing.SetGlobalTracer(tracer)
service.closer = closer
service.UpdateStateCode(internalpb.StateCode_Abnormal)
return service, nil
}

View File

@ -54,7 +54,7 @@ func (nodeCtx *nodeCtx) Start(ctx context.Context, wg *sync.WaitGroup) {
select {
case <-ctx.Done():
wg.Done()
fmt.Println(nodeCtx.node.Name(), "closed")
//fmt.Println(nodeCtx.node.Name(), "closed")
return
default:
// inputs from inputsMessages for Operate
@ -64,7 +64,7 @@ func (nodeCtx *nodeCtx) Start(ctx context.Context, wg *sync.WaitGroup) {
var res []Msg
var sp opentracing.Span
if !nodeCtx.node.IsInputNode() {
msgCtx = nodeCtx.collectInputMessages()
msgCtx = nodeCtx.collectInputMessages(ctx)
inputs = nodeCtx.inputMessages
}
n := nodeCtx.node
@ -108,7 +108,7 @@ func (nodeCtx *nodeCtx) ReceiveMsg(ctx context.Context, wg *sync.WaitGroup, msg
wg.Done()
}
func (nodeCtx *nodeCtx) collectInputMessages() context.Context {
func (nodeCtx *nodeCtx) collectInputMessages(exitCtx context.Context) context.Context {
var opts []opentracing.StartSpanOption
inputsNum := len(nodeCtx.inputChannels)
@ -119,17 +119,21 @@ func (nodeCtx *nodeCtx) collectInputMessages() context.Context {
// and move them to inputMessages.
for i := 0; i < inputsNum; i++ {
channel := nodeCtx.inputChannels[i]
msgWithCtx, ok := <-channel
if !ok {
// TODO: add status
log.Println("input channel closed")
select {
case <-exitCtx.Done():
return nil
}
nodeCtx.inputMessages[i] = msgWithCtx.msg
if msgWithCtx.ctx != nil {
sp, _ := trace.StartSpanFromContext(msgWithCtx.ctx)
opts = append(opts, opentracing.ChildOf(sp.Context()))
sp.Finish()
case msgWithCtx, ok := <-channel:
if !ok {
// TODO: add status
log.Println("input channel closed")
return nil
}
nodeCtx.inputMessages[i] = msgWithCtx.msg
if msgWithCtx.ctx != nil {
sp, _ := trace.StartSpanFromContext(msgWithCtx.ctx)
opts = append(opts, opentracing.ChildOf(sp.Context()))
sp.Finish()
}
}
}
@ -157,12 +161,16 @@ func (nodeCtx *nodeCtx) collectInputMessages() context.Context {
for nodeCtx.inputMessages[i].TimeTick() != latestTime {
fmt.Println("try to align timestamp, t1 =", latestTime, ", t2 =", nodeCtx.inputMessages[i].TimeTick())
channel := nodeCtx.inputChannels[i]
msg, ok := <-channel
if !ok {
log.Println("input channel closed")
select {
case <-exitCtx.Done():
return
case msg, ok := <-channel:
if !ok {
log.Println("input channel closed")
return
}
nodeCtx.inputMessages[i] = msg.msg
}
nodeCtx.inputMessages[i] = msg.msg
}
}
sign <- struct{}{}

View File

@ -2,6 +2,7 @@ package trace
import (
"context"
"io"
"runtime"
"strings"
@ -11,10 +12,30 @@ import (
"github.com/opentracing/opentracing-go/ext"
"github.com/opentracing/opentracing-go/log"
"github.com/uber/jaeger-client-go"
"github.com/uber/jaeger-client-go/config"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
)
func InitTracing(serviceName string) (opentracing.Tracer, io.Closer, error) {
if true {
cfg, err := config.FromEnv()
if err != nil {
return nil, nil, errors.New("trace from env error")
}
cfg.ServiceName = serviceName
return cfg.NewTracer()
}
cfg := &config.Configuration{
ServiceName: serviceName,
Sampler: &config.SamplerConfig{
Type: "const",
Param: 1,
},
}
return cfg.NewTracer()
}
func StartSpanFromContext(ctx context.Context, opts ...opentracing.StartSpanOption) (opentracing.Span, context.Context) {
if ctx == nil {
return noopSpan(), ctx

View File

@ -3,32 +3,14 @@ package trace
import (
"context"
"fmt"
"io"
"testing"
"errors"
"github.com/opentracing/opentracing-go"
oplog "github.com/opentracing/opentracing-go/log"
"github.com/uber/jaeger-client-go/config"
)
func InitTracing() io.Closer {
cfg := &config.Configuration{
ServiceName: "test",
Sampler: &config.SamplerConfig{
Type: "const",
Param: 1,
},
}
tracer, closer, err := cfg.NewTracer()
if err != nil {
panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
}
opentracing.SetGlobalTracer(tracer)
return closer
}
type simpleStruct struct {
name string
value string
@ -36,7 +18,8 @@ type simpleStruct struct {
func TestTracing(t *testing.T) {
//Already Init in each framework, this can be ignored in debug
closer := InitTracing()
tracer, closer, _ := InitTracing("test")
opentracing.SetGlobalTracer(tracer)
defer closer.Close()
// context normally can be propagated through func params