From 9e96ed487376a5648296254cbe2142a43ec7ea83 Mon Sep 17 00:00:00 2001 From: congqixia Date: Thu, 5 Sep 2024 14:27:04 +0800 Subject: [PATCH] fix: Fix tracing config update logic (#35928) Related to #35927 There are serveral issue this PR addresses: - Use `ResetTraceConfig` method instead init one in update event handler - Implement dynamic stats.Handler to receive tracing config update event - Update `enable_trace` flag when `ResetTraceConfig` is invoked - Change `enable_trace` to `std::atomic` in case of data race Signed-off-by: Congqi Xia --- cmd/roles/roles.go | 4 +- internal/core/src/common/Tracer.cpp | 21 ++- internal/distributed/datacoord/service.go | 7 +- internal/distributed/datanode/service.go | 7 +- internal/distributed/indexnode/service.go | 7 +- internal/distributed/proxy/service.go | 4 +- internal/distributed/querycoord/service.go | 8 +- internal/distributed/querynode/service.go | 10 +- internal/distributed/rootcoord/service.go | 7 +- internal/distributed/streamingnode/service.go | 8 +- internal/util/grpcclient/client.go | 8 +- pkg/tracer/stats_handler.go | 153 ++++++++++++++++++ 12 files changed, 191 insertions(+), 53 deletions(-) create mode 100644 pkg/tracer/stats_handler.go diff --git a/cmd/roles/roles.go b/cmd/roles/roles.go index 9bc116900a..cab4e652a1 100644 --- a/cmd/roles/roles.go +++ b/cmd/roles/roles.go @@ -528,8 +528,10 @@ func (mr *MilvusRoles) Run() { tracer.SetTracerProvider(exp, params.TraceCfg.SampleFraction.GetAsFloat()) log.Info("Reset tracer finished", zap.String("Exporter", params.TraceCfg.Exporter.GetValue()), zap.Float64("SampleFraction", params.TraceCfg.SampleFraction.GetAsFloat())) + tracer.NotifyTracerProviderUpdated() + if paramtable.GetRole() == typeutil.QueryNodeRole || paramtable.GetRole() == typeutil.StandaloneRole { - initcore.InitTraceConfig(params) + initcore.ResetTraceConfig(params) log.Info("Reset segcore tracer finished", zap.String("Exporter", params.TraceCfg.Exporter.GetValue())) } })) diff --git a/internal/core/src/common/Tracer.cpp b/internal/core/src/common/Tracer.cpp index 31b75cbc9c..8cacba579b 100644 --- a/internal/core/src/common/Tracer.cpp +++ b/internal/core/src/common/Tracer.cpp @@ -14,6 +14,8 @@ #include #include "log/Log.h" +#include +#include #include #include #include @@ -42,12 +44,13 @@ namespace jaeger = opentelemetry::exporter::jaeger; namespace ostream = opentelemetry::exporter::trace; namespace otlp = opentelemetry::exporter::otlp; -static bool enable_trace = true; +static std::atomic enable_trace = true; static std::shared_ptr noop_trace_provider = std::make_shared(); void initTelemetry(const TraceConfig& cfg) { + bool export_created = true; std::unique_ptr exporter; if (cfg.exporter == "stdout") { exporter = ostream::OStreamSpanExporterFactory::Create(); @@ -72,13 +75,13 @@ initTelemetry(const TraceConfig& cfg) { LOG_INFO("init otlp grpc exporter, endpoint: {}", opts.endpoint); } else { LOG_INFO("unknown otlp exporter method: {}", cfg.otlpMethod); - enable_trace = false; + export_created = false; } } else { LOG_INFO("Empty Trace"); - enable_trace = false; + export_created = false; } - if (enable_trace) { + if (export_created) { auto processor = trace_sdk::BatchSpanProcessorFactory::Create( std::move(exporter), {}); resource::ResourceAttributes attributes = { @@ -90,8 +93,10 @@ initTelemetry(const TraceConfig& cfg) { trace_sdk::TracerProviderFactory::Create( std::move(processor), resource, std::move(sampler)); trace::Provider::SetTracerProvider(provider); + enable_trace.store(true); } else { trace::Provider::SetTracerProvider(noop_trace_provider); + enable_trace.store(false); } } @@ -105,7 +110,7 @@ GetTracer() { std::shared_ptr StartSpan(const std::string& name, TraceContext* parentCtx) { trace::StartSpanOptions opts; - if (enable_trace && parentCtx != nullptr && parentCtx->traceID != nullptr && + if (enable_trace.load() && parentCtx != nullptr && parentCtx->traceID != nullptr && parentCtx->spanID != nullptr) { if (EmptyTraceID(parentCtx) || EmptySpanID(parentCtx)) { return noop_trace_provider->GetTracer("noop")->StartSpan("noop"); @@ -122,21 +127,21 @@ StartSpan(const std::string& name, TraceContext* parentCtx) { thread_local std::shared_ptr local_span; void SetRootSpan(std::shared_ptr span) { - if (enable_trace) { + if (enable_trace.load()) { local_span = std::move(span); } } void CloseRootSpan() { - if (enable_trace) { + if (enable_trace.load()) { local_span = nullptr; } } void AddEvent(const std::string& event_label) { - if (enable_trace && local_span != nullptr) { + if (enable_trace.load() && local_span != nullptr) { local_span->AddEvent(event_label); } } diff --git a/internal/distributed/datacoord/service.go b/internal/distributed/datacoord/service.go index e1c013e119..8d7271c256 100644 --- a/internal/distributed/datacoord/service.go +++ b/internal/distributed/datacoord/service.go @@ -27,7 +27,6 @@ import ( grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" "github.com/tikv/client-go/v2/txnkv" clientv3 "go.etcd.io/etcd/client/v3" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "go.uber.org/atomic" "go.uber.org/zap" "google.golang.org/grpc" @@ -170,14 +169,12 @@ func (s *Server) startGrpcLoop(grpcPort int) { Timeout: 10 * time.Second, // Wait 10 second for the ping ack before assuming the connection is dead } - opts := tracer.GetInterceptorOpts() s.grpcServer = grpc.NewServer( grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp), grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize.GetAsInt()), grpc.MaxSendMsgSize(Params.ServerMaxSendSize.GetAsInt()), grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( - otelgrpc.UnaryServerInterceptor(opts...), logutil.UnaryTraceLoggerInterceptor, interceptor.ClusterValidationUnaryServerInterceptor(), interceptor.ServerIDValidationUnaryServerInterceptor(func() int64 { @@ -189,7 +186,6 @@ func (s *Server) startGrpcLoop(grpcPort int) { streamingserviceinterceptor.NewStreamingServiceUnaryServerInterceptor(), )), grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( - otelgrpc.StreamServerInterceptor(opts...), logutil.StreamTraceLoggerInterceptor, interceptor.ClusterValidationStreamServerInterceptor(), interceptor.ServerIDValidationStreamServerInterceptor(func() int64 { @@ -199,7 +195,8 @@ func (s *Server) startGrpcLoop(grpcPort int) { return s.serverID.Load() }), streamingserviceinterceptor.NewStreamingServiceStreamServerInterceptor(), - ))) + )), + grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler())) indexpb.RegisterIndexCoordServer(s.grpcServer, s) datapb.RegisterDataCoordServer(s.grpcServer, s) // register the streaming coord grpc service. diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go index 2e530546d1..7d538d4c93 100644 --- a/internal/distributed/datanode/service.go +++ b/internal/distributed/datanode/service.go @@ -26,7 +26,6 @@ import ( grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" clientv3 "go.etcd.io/etcd/client/v3" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "go.uber.org/atomic" "go.uber.org/zap" "google.golang.org/grpc" @@ -131,14 +130,12 @@ func (s *Server) startGrpcLoop(grpcPort int) { return } - opts := tracer.GetInterceptorOpts() s.grpcServer = grpc.NewServer( grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp), grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize.GetAsInt()), grpc.MaxSendMsgSize(Params.ServerMaxSendSize.GetAsInt()), grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( - otelgrpc.UnaryServerInterceptor(opts...), logutil.UnaryTraceLoggerInterceptor, interceptor.ClusterValidationUnaryServerInterceptor(), interceptor.ServerIDValidationUnaryServerInterceptor(func() int64 { @@ -149,7 +146,6 @@ func (s *Server) startGrpcLoop(grpcPort int) { }), )), grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( - otelgrpc.StreamServerInterceptor(opts...), logutil.StreamTraceLoggerInterceptor, interceptor.ClusterValidationStreamServerInterceptor(), interceptor.ServerIDValidationStreamServerInterceptor(func() int64 { @@ -158,7 +154,8 @@ func (s *Server) startGrpcLoop(grpcPort int) { } return s.serverID.Load() }), - ))) + )), + grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler())) datapb.RegisterDataNodeServer(s.grpcServer, s) ctx, cancel := context.WithCancel(s.ctx) diff --git a/internal/distributed/indexnode/service.go b/internal/distributed/indexnode/service.go index 2bb94d6302..e888106f61 100644 --- a/internal/distributed/indexnode/service.go +++ b/internal/distributed/indexnode/service.go @@ -26,7 +26,6 @@ import ( grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" clientv3 "go.etcd.io/etcd/client/v3" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "go.uber.org/atomic" "go.uber.org/zap" "google.golang.org/grpc" @@ -105,14 +104,12 @@ func (s *Server) startGrpcLoop(grpcPort int) { Timeout: 10 * time.Second, // Wait 10 second for the ping ack before assuming the connection is dead } - opts := tracer.GetInterceptorOpts() s.grpcServer = grpc.NewServer( grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp), grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize.GetAsInt()), grpc.MaxSendMsgSize(Params.ServerMaxSendSize.GetAsInt()), grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( - otelgrpc.UnaryServerInterceptor(opts...), logutil.UnaryTraceLoggerInterceptor, interceptor.ClusterValidationUnaryServerInterceptor(), interceptor.ServerIDValidationUnaryServerInterceptor(func() int64 { @@ -123,7 +120,6 @@ func (s *Server) startGrpcLoop(grpcPort int) { }), )), grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( - otelgrpc.StreamServerInterceptor(opts...), logutil.StreamTraceLoggerInterceptor, interceptor.ClusterValidationStreamServerInterceptor(), interceptor.ServerIDValidationStreamServerInterceptor(func() int64 { @@ -132,7 +128,8 @@ func (s *Server) startGrpcLoop(grpcPort int) { } return s.serverID.Load() }), - ))) + )), + grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler())) workerpb.RegisterIndexNodeServer(s.grpcServer, s) go funcutil.CheckGrpcReady(ctx, s.grpcErrChan) if err := s.grpcServer.Serve(lis); err != nil { diff --git a/internal/distributed/proxy/service.go b/internal/distributed/proxy/service.go index 0ef76a8fc2..9fd8c39e52 100644 --- a/internal/distributed/proxy/service.go +++ b/internal/distributed/proxy/service.go @@ -298,13 +298,10 @@ func (s *Server) startExternalGrpc(grpcPort int, errChan chan error) { } log.Debug("Get proxy rate limiter done", zap.Int("port", grpcPort)) - opts := tracer.GetInterceptorOpts() - var unaryServerOption grpc.ServerOption if enableCustomInterceptor { unaryServerOption = grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( accesslog.UnaryAccessLogInterceptor, - otelgrpc.UnaryServerInterceptor(opts...), grpc_auth.UnaryServerInterceptor(proxy.AuthenticationInterceptor), proxy.DatabaseInterceptor(), proxy.UnaryServerHookInterceptor(), @@ -325,6 +322,7 @@ func (s *Server) startExternalGrpc(grpcPort int, errChan chan error) { grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize.GetAsInt()), grpc.MaxSendMsgSize(Params.ServerMaxSendSize.GetAsInt()), unaryServerOption, + grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()), } if Params.TLSMode.GetAsInt() == 1 { diff --git a/internal/distributed/querycoord/service.go b/internal/distributed/querycoord/service.go index 8ed1de97be..2e77561dde 100644 --- a/internal/distributed/querycoord/service.go +++ b/internal/distributed/querycoord/service.go @@ -26,7 +26,6 @@ import ( grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" "github.com/tikv/client-go/v2/txnkv" clientv3 "go.etcd.io/etcd/client/v3" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "go.uber.org/atomic" "go.uber.org/zap" "google.golang.org/grpc" @@ -226,14 +225,12 @@ func (s *Server) startGrpcLoop(grpcPort int) { ctx, cancel := context.WithCancel(s.loopCtx) defer cancel() - opts := tracer.GetInterceptorOpts() s.grpcServer = grpc.NewServer( grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp), grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize.GetAsInt()), grpc.MaxSendMsgSize(Params.ServerMaxSendSize.GetAsInt()), grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( - otelgrpc.UnaryServerInterceptor(opts...), logutil.UnaryTraceLoggerInterceptor, interceptor.ClusterValidationUnaryServerInterceptor(), interceptor.ServerIDValidationUnaryServerInterceptor(func() int64 { @@ -244,7 +241,6 @@ func (s *Server) startGrpcLoop(grpcPort int) { }), )), grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( - otelgrpc.StreamServerInterceptor(opts...), logutil.StreamTraceLoggerInterceptor, interceptor.ClusterValidationStreamServerInterceptor(), interceptor.ServerIDValidationStreamServerInterceptor(func() int64 { @@ -253,7 +249,9 @@ func (s *Server) startGrpcLoop(grpcPort int) { } return s.serverID.Load() }), - ))) + )), + grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()), + ) querypb.RegisterQueryCoordServer(s.grpcServer, s) go funcutil.CheckGrpcReady(ctx, s.grpcErrChan) diff --git a/internal/distributed/querynode/service.go b/internal/distributed/querynode/service.go index 445d30fc52..e60120050d 100644 --- a/internal/distributed/querynode/service.go +++ b/internal/distributed/querynode/service.go @@ -26,7 +26,6 @@ import ( grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" clientv3 "go.etcd.io/etcd/client/v3" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "go.uber.org/atomic" "go.uber.org/zap" "google.golang.org/grpc" @@ -186,14 +185,13 @@ func (s *Server) startGrpcLoop(grpcPort int) { return } - opts := tracer.GetInterceptorOpts() s.grpcServer = grpc.NewServer( grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp), grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize.GetAsInt()), grpc.MaxSendMsgSize(Params.ServerMaxSendSize.GetAsInt()), grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( - otelgrpc.UnaryServerInterceptor(opts...), + // otelgrpc.UnaryServerInterceptor(opts...), logutil.UnaryTraceLoggerInterceptor, interceptor.ClusterValidationUnaryServerInterceptor(), interceptor.ServerIDValidationUnaryServerInterceptor(func() int64 { @@ -204,7 +202,7 @@ func (s *Server) startGrpcLoop(grpcPort int) { }), )), grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( - otelgrpc.StreamServerInterceptor(opts...), + // otelgrpc.StreamServerInterceptor(opts...), logutil.StreamTraceLoggerInterceptor, interceptor.ClusterValidationStreamServerInterceptor(), interceptor.ServerIDValidationStreamServerInterceptor(func() int64 { @@ -213,7 +211,9 @@ func (s *Server) startGrpcLoop(grpcPort int) { } return s.serverID.Load() }), - ))) + )), + grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()), + ) querypb.RegisterQueryNodeServer(s.grpcServer, s) ctx, cancel := context.WithCancel(s.ctx) diff --git a/internal/distributed/rootcoord/service.go b/internal/distributed/rootcoord/service.go index f36f64355c..9f50661fe9 100644 --- a/internal/distributed/rootcoord/service.go +++ b/internal/distributed/rootcoord/service.go @@ -26,7 +26,6 @@ import ( grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" "github.com/tikv/client-go/v2/txnkv" clientv3 "go.etcd.io/etcd/client/v3" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "go.uber.org/atomic" "go.uber.org/zap" "google.golang.org/grpc" @@ -272,14 +271,12 @@ func (s *Server) startGrpcLoop(port int) { ctx, cancel := context.WithCancel(s.ctx) defer cancel() - opts := tracer.GetInterceptorOpts() s.grpcServer = grpc.NewServer( grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp), grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize.GetAsInt()), grpc.MaxSendMsgSize(Params.ServerMaxSendSize.GetAsInt()), grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( - otelgrpc.UnaryServerInterceptor(opts...), logutil.UnaryTraceLoggerInterceptor, interceptor.ClusterValidationUnaryServerInterceptor(), interceptor.ServerIDValidationUnaryServerInterceptor(func() int64 { @@ -290,7 +287,6 @@ func (s *Server) startGrpcLoop(port int) { }), )), grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( - otelgrpc.StreamServerInterceptor(opts...), logutil.StreamTraceLoggerInterceptor, interceptor.ClusterValidationStreamServerInterceptor(), interceptor.ServerIDValidationStreamServerInterceptor(func() int64 { @@ -299,7 +295,8 @@ func (s *Server) startGrpcLoop(port int) { } return s.serverID.Load() }), - ))) + )), + grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler())) rootcoordpb.RegisterRootCoordServer(s.grpcServer, s) go funcutil.CheckGrpcReady(ctx, s.grpcErrChan) diff --git a/internal/distributed/streamingnode/service.go b/internal/distributed/streamingnode/service.go index eec82a7cff..460b534bde 100644 --- a/internal/distributed/streamingnode/service.go +++ b/internal/distributed/streamingnode/service.go @@ -29,7 +29,6 @@ import ( grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" "github.com/tikv/client-go/v2/txnkv" clientv3 "go.etcd.io/etcd/client/v3" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" @@ -331,26 +330,25 @@ func (s *Server) initGRPCServer() { serverIDGetter := func() int64 { return s.session.ServerID } - opts := tracer.GetInterceptorOpts() s.grpcServer = grpc.NewServer( grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp), grpc.MaxRecvMsgSize(cfg.ServerMaxRecvSize.GetAsInt()), grpc.MaxSendMsgSize(cfg.ServerMaxSendSize.GetAsInt()), grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( - otelgrpc.UnaryServerInterceptor(opts...), logutil.UnaryTraceLoggerInterceptor, interceptor.ClusterValidationUnaryServerInterceptor(), interceptor.ServerIDValidationUnaryServerInterceptor(serverIDGetter), streamingserviceinterceptor.NewStreamingServiceUnaryServerInterceptor(), )), grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( - otelgrpc.StreamServerInterceptor(opts...), logutil.StreamTraceLoggerInterceptor, interceptor.ClusterValidationStreamServerInterceptor(), interceptor.ServerIDValidationStreamServerInterceptor(serverIDGetter), streamingserviceinterceptor.NewStreamingServiceStreamServerInterceptor(), - ))) + )), + grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()), + ) } // allocateAddress allocates a available address for streamingnode grpc server. diff --git a/internal/util/grpcclient/client.go b/internal/util/grpcclient/client.go index 8a6912df56..dd9e805da5 100644 --- a/internal/util/grpcclient/client.go +++ b/internal/util/grpcclient/client.go @@ -25,7 +25,6 @@ import ( "github.com/cockroachdb/errors" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "go.uber.org/atomic" "go.uber.org/zap" "google.golang.org/grpc" @@ -250,7 +249,6 @@ func (c *ClientBase[T]) connect(ctx context.Context) error { return err } - opts := tracer.GetInterceptorOpts() dialContext, cancel := context.WithTimeout(ctx, c.DialTimeout) var conn *grpc.ClientConn @@ -271,12 +269,10 @@ func (c *ClientBase[T]) connect(ctx context.Context) error { grpc.UseCompressor(compress), ), grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( - otelgrpc.UnaryClientInterceptor(opts...), interceptor.ClusterInjectionUnaryClientInterceptor(), interceptor.ServerIDInjectionUnaryClientInterceptor(c.GetNodeID()), )), grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( - otelgrpc.StreamClientInterceptor(opts...), interceptor.ClusterInjectionStreamClientInterceptor(), interceptor.ServerIDInjectionStreamClientInterceptor(c.GetNodeID()), )), @@ -298,6 +294,7 @@ func (c *ClientBase[T]) connect(ctx context.Context) error { grpc.FailOnNonTempDialError(true), grpc.WithReturnConnectionError(), grpc.WithDisableRetry(), + grpc.WithStatsHandler(tracer.GetDynamicOtelGrpcClientStatsHandler()), ) } else { conn, err = grpc.DialContext( @@ -311,12 +308,10 @@ func (c *ClientBase[T]) connect(ctx context.Context) error { grpc.UseCompressor(compress), ), grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( - otelgrpc.UnaryClientInterceptor(opts...), interceptor.ClusterInjectionUnaryClientInterceptor(), interceptor.ServerIDInjectionUnaryClientInterceptor(c.GetNodeID()), )), grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( - otelgrpc.StreamClientInterceptor(opts...), interceptor.ClusterInjectionStreamClientInterceptor(), interceptor.ServerIDInjectionStreamClientInterceptor(c.GetNodeID()), )), @@ -338,6 +333,7 @@ func (c *ClientBase[T]) connect(ctx context.Context) error { grpc.FailOnNonTempDialError(true), grpc.WithReturnConnectionError(), grpc.WithDisableRetry(), + grpc.WithStatsHandler(tracer.GetDynamicOtelGrpcClientStatsHandler()), ) } diff --git a/pkg/tracer/stats_handler.go b/pkg/tracer/stats_handler.go new file mode 100644 index 0000000000..d00735c1c3 --- /dev/null +++ b/pkg/tracer/stats_handler.go @@ -0,0 +1,153 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tracer + +import ( + "context" + "sync" + "sync/atomic" + + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "go.opentelemetry.io/otel" + "google.golang.org/grpc/stats" +) + +var ( + dynamicServerHandler *dynamicOtelGrpcStatsHandler + initServerOnce sync.Once + dynamicClientHandler *dynamicOtelGrpcStatsHandler + initClientOnce sync.Once +) + +// dynamicOtelGrpcStatsHandler wraps otelgprc.StatsHandler +// to implement runtime configuration update. +type dynamicOtelGrpcStatsHandler struct { + handler atomic.Pointer[stats.Handler] +} + +func getDynamicServerHandler() *dynamicOtelGrpcStatsHandler { + initServerOnce.Do(func() { + statsHandler := otelgrpc.NewServerHandler( + otelgrpc.WithInterceptorFilter(filterFunc), + otelgrpc.WithTracerProvider(otel.GetTracerProvider()), + ) + + dynamicServerHandler = &dynamicOtelGrpcStatsHandler{} + dynamicServerHandler.handler.Store(&statsHandler) + }) + + return dynamicServerHandler +} + +func getDynamicClientHandler() *dynamicOtelGrpcStatsHandler { + initClientOnce.Do(func() { + statsHandler := otelgrpc.NewClientHandler( + otelgrpc.WithInterceptorFilter(filterFunc), + otelgrpc.WithTracerProvider(otel.GetTracerProvider()), + ) + + dynamicClientHandler = &dynamicOtelGrpcStatsHandler{} + dynamicClientHandler.handler.Store(&statsHandler) + }) + + return dynamicClientHandler +} + +// GetDynamicOtelGrpcServerStatsHandler returns the singleton instance of grpc server stats.Handler +func GetDynamicOtelGrpcServerStatsHandler() stats.Handler { + return getDynamicServerHandler() +} + +// GetDynamicOtelGrpcClientStatsHandler returns the singleton instance of grpc client stats.Handler +func GetDynamicOtelGrpcClientStatsHandler() stats.Handler { + return getDynamicClientHandler() +} + +func NotifyTracerProviderUpdated() { + serverhandler := getDynamicServerHandler() + statsHandler := otelgrpc.NewServerHandler( + otelgrpc.WithInterceptorFilter(filterFunc), + otelgrpc.WithTracerProvider(otel.GetTracerProvider()), + ) + + serverhandler.setHandler(statsHandler) + + clientHandler := getDynamicClientHandler() + statsHandler = otelgrpc.NewClientHandler( + otelgrpc.WithInterceptorFilter(filterFunc), + otelgrpc.WithTracerProvider(otel.GetTracerProvider()), + ) + clientHandler.setHandler(statsHandler) +} + +func (h *dynamicOtelGrpcStatsHandler) getHandler() stats.Handler { + return *h.handler.Load() +} + +func (h *dynamicOtelGrpcStatsHandler) setHandler(handler stats.Handler) { + h.handler.Store(&handler) +} + +// TagRPC can attach some information to the given context. +// The context used for the rest lifetime of the RPC will be derived from +// the returned context. +func (h *dynamicOtelGrpcStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { + handler := h.getHandler() + if handler == nil { + return ctx + } + + return handler.TagRPC(ctx, info) +} + +// HandleRPC processes the RPC stats. +func (h *dynamicOtelGrpcStatsHandler) HandleRPC(ctx context.Context, stats stats.RPCStats) { + handler := h.getHandler() + if handler == nil { + return + } + + handler.HandleRPC(ctx, stats) +} + +// TagConn can attach some information to the given context. +// The returned context will be used for stats handling. +// For conn stats handling, the context used in HandleConn for this +// connection will be derived from the context returned. +// For RPC stats handling, +// - On server side, the context used in HandleRPC for all RPCs on this +// +// connection will be derived from the context returned. +// - On client side, the context is not derived from the context returned. +func (h *dynamicOtelGrpcStatsHandler) TagConn(ctx context.Context, tagInfo *stats.ConnTagInfo) context.Context { + handler := h.getHandler() + if handler == nil { + return ctx + } + + return handler.TagConn(ctx, tagInfo) +} + +// HandleConn processes the Conn stats. +func (h *dynamicOtelGrpcStatsHandler) HandleConn(ctx context.Context, stats stats.ConnStats) { + handler := h.getHandler() + if handler == nil { + return + } + + handler.HandleConn(ctx, stats) +}