fix: Fix tracing config update logic (#35928)

Related to #35927

There are serveral issue this PR addresses:
- Use `ResetTraceConfig` method instead init one in update event handler
- Implement dynamic stats.Handler to receive tracing config update event
- Update `enable_trace` flag when `ResetTraceConfig` is invoked
- Change `enable_trace` to `std::atomic<bool>` in case of data race

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/35991/head^2
congqixia 2024-09-05 14:27:04 +08:00 committed by GitHub
parent 5e3f700e5d
commit 9e96ed4873
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 191 additions and 53 deletions

View File

@ -528,8 +528,10 @@ func (mr *MilvusRoles) Run() {
tracer.SetTracerProvider(exp, params.TraceCfg.SampleFraction.GetAsFloat())
log.Info("Reset tracer finished", zap.String("Exporter", params.TraceCfg.Exporter.GetValue()), zap.Float64("SampleFraction", params.TraceCfg.SampleFraction.GetAsFloat()))
tracer.NotifyTracerProviderUpdated()
if paramtable.GetRole() == typeutil.QueryNodeRole || paramtable.GetRole() == typeutil.StandaloneRole {
initcore.InitTraceConfig(params)
initcore.ResetTraceConfig(params)
log.Info("Reset segcore tracer finished", zap.String("Exporter", params.TraceCfg.Exporter.GetValue()))
}
}))

View File

@ -14,6 +14,8 @@
#include <opentelemetry/exporters/otlp/otlp_http_exporter_options.h>
#include "log/Log.h"
#include <atomic>
#include <cstddef>
#include <iomanip>
#include <iostream>
#include <utility>
@ -42,12 +44,13 @@ namespace jaeger = opentelemetry::exporter::jaeger;
namespace ostream = opentelemetry::exporter::trace;
namespace otlp = opentelemetry::exporter::otlp;
static bool enable_trace = true;
static std::atomic<bool> enable_trace = true;
static std::shared_ptr<trace::TracerProvider> noop_trace_provider =
std::make_shared<opentelemetry::trace::NoopTracerProvider>();
void
initTelemetry(const TraceConfig& cfg) {
bool export_created = true;
std::unique_ptr<opentelemetry::sdk::trace::SpanExporter> exporter;
if (cfg.exporter == "stdout") {
exporter = ostream::OStreamSpanExporterFactory::Create();
@ -72,13 +75,13 @@ initTelemetry(const TraceConfig& cfg) {
LOG_INFO("init otlp grpc exporter, endpoint: {}", opts.endpoint);
} else {
LOG_INFO("unknown otlp exporter method: {}", cfg.otlpMethod);
enable_trace = false;
export_created = false;
}
} else {
LOG_INFO("Empty Trace");
enable_trace = false;
export_created = false;
}
if (enable_trace) {
if (export_created) {
auto processor = trace_sdk::BatchSpanProcessorFactory::Create(
std::move(exporter), {});
resource::ResourceAttributes attributes = {
@ -90,8 +93,10 @@ initTelemetry(const TraceConfig& cfg) {
trace_sdk::TracerProviderFactory::Create(
std::move(processor), resource, std::move(sampler));
trace::Provider::SetTracerProvider(provider);
enable_trace.store(true);
} else {
trace::Provider::SetTracerProvider(noop_trace_provider);
enable_trace.store(false);
}
}
@ -105,7 +110,7 @@ GetTracer() {
std::shared_ptr<trace::Span>
StartSpan(const std::string& name, TraceContext* parentCtx) {
trace::StartSpanOptions opts;
if (enable_trace && parentCtx != nullptr && parentCtx->traceID != nullptr &&
if (enable_trace.load() && parentCtx != nullptr && parentCtx->traceID != nullptr &&
parentCtx->spanID != nullptr) {
if (EmptyTraceID(parentCtx) || EmptySpanID(parentCtx)) {
return noop_trace_provider->GetTracer("noop")->StartSpan("noop");
@ -122,21 +127,21 @@ StartSpan(const std::string& name, TraceContext* parentCtx) {
thread_local std::shared_ptr<trace::Span> local_span;
void
SetRootSpan(std::shared_ptr<trace::Span> span) {
if (enable_trace) {
if (enable_trace.load()) {
local_span = std::move(span);
}
}
void
CloseRootSpan() {
if (enable_trace) {
if (enable_trace.load()) {
local_span = nullptr;
}
}
void
AddEvent(const std::string& event_label) {
if (enable_trace && local_span != nullptr) {
if (enable_trace.load() && local_span != nullptr) {
local_span->AddEvent(event_label);
}
}

View File

@ -27,7 +27,6 @@ import (
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/tikv/client-go/v2/txnkv"
clientv3 "go.etcd.io/etcd/client/v3"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.uber.org/atomic"
"go.uber.org/zap"
"google.golang.org/grpc"
@ -170,14 +169,12 @@ func (s *Server) startGrpcLoop(grpcPort int) {
Timeout: 10 * time.Second, // Wait 10 second for the ping ack before assuming the connection is dead
}
opts := tracer.GetInterceptorOpts()
s.grpcServer = grpc.NewServer(
grpc.KeepaliveEnforcementPolicy(kaep),
grpc.KeepaliveParams(kasp),
grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize.GetAsInt()),
grpc.MaxSendMsgSize(Params.ServerMaxSendSize.GetAsInt()),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
otelgrpc.UnaryServerInterceptor(opts...),
logutil.UnaryTraceLoggerInterceptor,
interceptor.ClusterValidationUnaryServerInterceptor(),
interceptor.ServerIDValidationUnaryServerInterceptor(func() int64 {
@ -189,7 +186,6 @@ func (s *Server) startGrpcLoop(grpcPort int) {
streamingserviceinterceptor.NewStreamingServiceUnaryServerInterceptor(),
)),
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
otelgrpc.StreamServerInterceptor(opts...),
logutil.StreamTraceLoggerInterceptor,
interceptor.ClusterValidationStreamServerInterceptor(),
interceptor.ServerIDValidationStreamServerInterceptor(func() int64 {
@ -199,7 +195,8 @@ func (s *Server) startGrpcLoop(grpcPort int) {
return s.serverID.Load()
}),
streamingserviceinterceptor.NewStreamingServiceStreamServerInterceptor(),
)))
)),
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()))
indexpb.RegisterIndexCoordServer(s.grpcServer, s)
datapb.RegisterDataCoordServer(s.grpcServer, s)
// register the streaming coord grpc service.

View File

@ -26,7 +26,6 @@ import (
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
clientv3 "go.etcd.io/etcd/client/v3"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.uber.org/atomic"
"go.uber.org/zap"
"google.golang.org/grpc"
@ -131,14 +130,12 @@ func (s *Server) startGrpcLoop(grpcPort int) {
return
}
opts := tracer.GetInterceptorOpts()
s.grpcServer = grpc.NewServer(
grpc.KeepaliveEnforcementPolicy(kaep),
grpc.KeepaliveParams(kasp),
grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize.GetAsInt()),
grpc.MaxSendMsgSize(Params.ServerMaxSendSize.GetAsInt()),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
otelgrpc.UnaryServerInterceptor(opts...),
logutil.UnaryTraceLoggerInterceptor,
interceptor.ClusterValidationUnaryServerInterceptor(),
interceptor.ServerIDValidationUnaryServerInterceptor(func() int64 {
@ -149,7 +146,6 @@ func (s *Server) startGrpcLoop(grpcPort int) {
}),
)),
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
otelgrpc.StreamServerInterceptor(opts...),
logutil.StreamTraceLoggerInterceptor,
interceptor.ClusterValidationStreamServerInterceptor(),
interceptor.ServerIDValidationStreamServerInterceptor(func() int64 {
@ -158,7 +154,8 @@ func (s *Server) startGrpcLoop(grpcPort int) {
}
return s.serverID.Load()
}),
)))
)),
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()))
datapb.RegisterDataNodeServer(s.grpcServer, s)
ctx, cancel := context.WithCancel(s.ctx)

View File

@ -26,7 +26,6 @@ import (
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
clientv3 "go.etcd.io/etcd/client/v3"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.uber.org/atomic"
"go.uber.org/zap"
"google.golang.org/grpc"
@ -105,14 +104,12 @@ func (s *Server) startGrpcLoop(grpcPort int) {
Timeout: 10 * time.Second, // Wait 10 second for the ping ack before assuming the connection is dead
}
opts := tracer.GetInterceptorOpts()
s.grpcServer = grpc.NewServer(
grpc.KeepaliveEnforcementPolicy(kaep),
grpc.KeepaliveParams(kasp),
grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize.GetAsInt()),
grpc.MaxSendMsgSize(Params.ServerMaxSendSize.GetAsInt()),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
otelgrpc.UnaryServerInterceptor(opts...),
logutil.UnaryTraceLoggerInterceptor,
interceptor.ClusterValidationUnaryServerInterceptor(),
interceptor.ServerIDValidationUnaryServerInterceptor(func() int64 {
@ -123,7 +120,6 @@ func (s *Server) startGrpcLoop(grpcPort int) {
}),
)),
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
otelgrpc.StreamServerInterceptor(opts...),
logutil.StreamTraceLoggerInterceptor,
interceptor.ClusterValidationStreamServerInterceptor(),
interceptor.ServerIDValidationStreamServerInterceptor(func() int64 {
@ -132,7 +128,8 @@ func (s *Server) startGrpcLoop(grpcPort int) {
}
return s.serverID.Load()
}),
)))
)),
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()))
workerpb.RegisterIndexNodeServer(s.grpcServer, s)
go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)
if err := s.grpcServer.Serve(lis); err != nil {

View File

@ -298,13 +298,10 @@ func (s *Server) startExternalGrpc(grpcPort int, errChan chan error) {
}
log.Debug("Get proxy rate limiter done", zap.Int("port", grpcPort))
opts := tracer.GetInterceptorOpts()
var unaryServerOption grpc.ServerOption
if enableCustomInterceptor {
unaryServerOption = grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
accesslog.UnaryAccessLogInterceptor,
otelgrpc.UnaryServerInterceptor(opts...),
grpc_auth.UnaryServerInterceptor(proxy.AuthenticationInterceptor),
proxy.DatabaseInterceptor(),
proxy.UnaryServerHookInterceptor(),
@ -325,6 +322,7 @@ func (s *Server) startExternalGrpc(grpcPort int, errChan chan error) {
grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize.GetAsInt()),
grpc.MaxSendMsgSize(Params.ServerMaxSendSize.GetAsInt()),
unaryServerOption,
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()),
}
if Params.TLSMode.GetAsInt() == 1 {

View File

@ -26,7 +26,6 @@ import (
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/tikv/client-go/v2/txnkv"
clientv3 "go.etcd.io/etcd/client/v3"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.uber.org/atomic"
"go.uber.org/zap"
"google.golang.org/grpc"
@ -226,14 +225,12 @@ func (s *Server) startGrpcLoop(grpcPort int) {
ctx, cancel := context.WithCancel(s.loopCtx)
defer cancel()
opts := tracer.GetInterceptorOpts()
s.grpcServer = grpc.NewServer(
grpc.KeepaliveEnforcementPolicy(kaep),
grpc.KeepaliveParams(kasp),
grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize.GetAsInt()),
grpc.MaxSendMsgSize(Params.ServerMaxSendSize.GetAsInt()),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
otelgrpc.UnaryServerInterceptor(opts...),
logutil.UnaryTraceLoggerInterceptor,
interceptor.ClusterValidationUnaryServerInterceptor(),
interceptor.ServerIDValidationUnaryServerInterceptor(func() int64 {
@ -244,7 +241,6 @@ func (s *Server) startGrpcLoop(grpcPort int) {
}),
)),
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
otelgrpc.StreamServerInterceptor(opts...),
logutil.StreamTraceLoggerInterceptor,
interceptor.ClusterValidationStreamServerInterceptor(),
interceptor.ServerIDValidationStreamServerInterceptor(func() int64 {
@ -253,7 +249,9 @@ func (s *Server) startGrpcLoop(grpcPort int) {
}
return s.serverID.Load()
}),
)))
)),
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()),
)
querypb.RegisterQueryCoordServer(s.grpcServer, s)
go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)

View File

@ -26,7 +26,6 @@ import (
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
clientv3 "go.etcd.io/etcd/client/v3"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.uber.org/atomic"
"go.uber.org/zap"
"google.golang.org/grpc"
@ -186,14 +185,13 @@ func (s *Server) startGrpcLoop(grpcPort int) {
return
}
opts := tracer.GetInterceptorOpts()
s.grpcServer = grpc.NewServer(
grpc.KeepaliveEnforcementPolicy(kaep),
grpc.KeepaliveParams(kasp),
grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize.GetAsInt()),
grpc.MaxSendMsgSize(Params.ServerMaxSendSize.GetAsInt()),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
otelgrpc.UnaryServerInterceptor(opts...),
// otelgrpc.UnaryServerInterceptor(opts...),
logutil.UnaryTraceLoggerInterceptor,
interceptor.ClusterValidationUnaryServerInterceptor(),
interceptor.ServerIDValidationUnaryServerInterceptor(func() int64 {
@ -204,7 +202,7 @@ func (s *Server) startGrpcLoop(grpcPort int) {
}),
)),
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
otelgrpc.StreamServerInterceptor(opts...),
// otelgrpc.StreamServerInterceptor(opts...),
logutil.StreamTraceLoggerInterceptor,
interceptor.ClusterValidationStreamServerInterceptor(),
interceptor.ServerIDValidationStreamServerInterceptor(func() int64 {
@ -213,7 +211,9 @@ func (s *Server) startGrpcLoop(grpcPort int) {
}
return s.serverID.Load()
}),
)))
)),
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()),
)
querypb.RegisterQueryNodeServer(s.grpcServer, s)
ctx, cancel := context.WithCancel(s.ctx)

View File

@ -26,7 +26,6 @@ import (
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/tikv/client-go/v2/txnkv"
clientv3 "go.etcd.io/etcd/client/v3"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.uber.org/atomic"
"go.uber.org/zap"
"google.golang.org/grpc"
@ -272,14 +271,12 @@ func (s *Server) startGrpcLoop(port int) {
ctx, cancel := context.WithCancel(s.ctx)
defer cancel()
opts := tracer.GetInterceptorOpts()
s.grpcServer = grpc.NewServer(
grpc.KeepaliveEnforcementPolicy(kaep),
grpc.KeepaliveParams(kasp),
grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize.GetAsInt()),
grpc.MaxSendMsgSize(Params.ServerMaxSendSize.GetAsInt()),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
otelgrpc.UnaryServerInterceptor(opts...),
logutil.UnaryTraceLoggerInterceptor,
interceptor.ClusterValidationUnaryServerInterceptor(),
interceptor.ServerIDValidationUnaryServerInterceptor(func() int64 {
@ -290,7 +287,6 @@ func (s *Server) startGrpcLoop(port int) {
}),
)),
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
otelgrpc.StreamServerInterceptor(opts...),
logutil.StreamTraceLoggerInterceptor,
interceptor.ClusterValidationStreamServerInterceptor(),
interceptor.ServerIDValidationStreamServerInterceptor(func() int64 {
@ -299,7 +295,8 @@ func (s *Server) startGrpcLoop(port int) {
}
return s.serverID.Load()
}),
)))
)),
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()))
rootcoordpb.RegisterRootCoordServer(s.grpcServer, s)
go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)

View File

@ -29,7 +29,6 @@ import (
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/tikv/client-go/v2/txnkv"
clientv3 "go.etcd.io/etcd/client/v3"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
@ -331,26 +330,25 @@ func (s *Server) initGRPCServer() {
serverIDGetter := func() int64 {
return s.session.ServerID
}
opts := tracer.GetInterceptorOpts()
s.grpcServer = grpc.NewServer(
grpc.KeepaliveEnforcementPolicy(kaep),
grpc.KeepaliveParams(kasp),
grpc.MaxRecvMsgSize(cfg.ServerMaxRecvSize.GetAsInt()),
grpc.MaxSendMsgSize(cfg.ServerMaxSendSize.GetAsInt()),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
otelgrpc.UnaryServerInterceptor(opts...),
logutil.UnaryTraceLoggerInterceptor,
interceptor.ClusterValidationUnaryServerInterceptor(),
interceptor.ServerIDValidationUnaryServerInterceptor(serverIDGetter),
streamingserviceinterceptor.NewStreamingServiceUnaryServerInterceptor(),
)),
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
otelgrpc.StreamServerInterceptor(opts...),
logutil.StreamTraceLoggerInterceptor,
interceptor.ClusterValidationStreamServerInterceptor(),
interceptor.ServerIDValidationStreamServerInterceptor(serverIDGetter),
streamingserviceinterceptor.NewStreamingServiceStreamServerInterceptor(),
)))
)),
grpc.StatsHandler(tracer.GetDynamicOtelGrpcServerStatsHandler()),
)
}
// allocateAddress allocates a available address for streamingnode grpc server.

View File

@ -25,7 +25,6 @@ import (
"github.com/cockroachdb/errors"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.uber.org/atomic"
"go.uber.org/zap"
"google.golang.org/grpc"
@ -250,7 +249,6 @@ func (c *ClientBase[T]) connect(ctx context.Context) error {
return err
}
opts := tracer.GetInterceptorOpts()
dialContext, cancel := context.WithTimeout(ctx, c.DialTimeout)
var conn *grpc.ClientConn
@ -271,12 +269,10 @@ func (c *ClientBase[T]) connect(ctx context.Context) error {
grpc.UseCompressor(compress),
),
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
otelgrpc.UnaryClientInterceptor(opts...),
interceptor.ClusterInjectionUnaryClientInterceptor(),
interceptor.ServerIDInjectionUnaryClientInterceptor(c.GetNodeID()),
)),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
otelgrpc.StreamClientInterceptor(opts...),
interceptor.ClusterInjectionStreamClientInterceptor(),
interceptor.ServerIDInjectionStreamClientInterceptor(c.GetNodeID()),
)),
@ -298,6 +294,7 @@ func (c *ClientBase[T]) connect(ctx context.Context) error {
grpc.FailOnNonTempDialError(true),
grpc.WithReturnConnectionError(),
grpc.WithDisableRetry(),
grpc.WithStatsHandler(tracer.GetDynamicOtelGrpcClientStatsHandler()),
)
} else {
conn, err = grpc.DialContext(
@ -311,12 +308,10 @@ func (c *ClientBase[T]) connect(ctx context.Context) error {
grpc.UseCompressor(compress),
),
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
otelgrpc.UnaryClientInterceptor(opts...),
interceptor.ClusterInjectionUnaryClientInterceptor(),
interceptor.ServerIDInjectionUnaryClientInterceptor(c.GetNodeID()),
)),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
otelgrpc.StreamClientInterceptor(opts...),
interceptor.ClusterInjectionStreamClientInterceptor(),
interceptor.ServerIDInjectionStreamClientInterceptor(c.GetNodeID()),
)),
@ -338,6 +333,7 @@ func (c *ClientBase[T]) connect(ctx context.Context) error {
grpc.FailOnNonTempDialError(true),
grpc.WithReturnConnectionError(),
grpc.WithDisableRetry(),
grpc.WithStatsHandler(tracer.GetDynamicOtelGrpcClientStatsHandler()),
)
}

153
pkg/tracer/stats_handler.go Normal file
View File

@ -0,0 +1,153 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tracer
import (
"context"
"sync"
"sync/atomic"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel"
"google.golang.org/grpc/stats"
)
var (
dynamicServerHandler *dynamicOtelGrpcStatsHandler
initServerOnce sync.Once
dynamicClientHandler *dynamicOtelGrpcStatsHandler
initClientOnce sync.Once
)
// dynamicOtelGrpcStatsHandler wraps otelgprc.StatsHandler
// to implement runtime configuration update.
type dynamicOtelGrpcStatsHandler struct {
handler atomic.Pointer[stats.Handler]
}
func getDynamicServerHandler() *dynamicOtelGrpcStatsHandler {
initServerOnce.Do(func() {
statsHandler := otelgrpc.NewServerHandler(
otelgrpc.WithInterceptorFilter(filterFunc),
otelgrpc.WithTracerProvider(otel.GetTracerProvider()),
)
dynamicServerHandler = &dynamicOtelGrpcStatsHandler{}
dynamicServerHandler.handler.Store(&statsHandler)
})
return dynamicServerHandler
}
func getDynamicClientHandler() *dynamicOtelGrpcStatsHandler {
initClientOnce.Do(func() {
statsHandler := otelgrpc.NewClientHandler(
otelgrpc.WithInterceptorFilter(filterFunc),
otelgrpc.WithTracerProvider(otel.GetTracerProvider()),
)
dynamicClientHandler = &dynamicOtelGrpcStatsHandler{}
dynamicClientHandler.handler.Store(&statsHandler)
})
return dynamicClientHandler
}
// GetDynamicOtelGrpcServerStatsHandler returns the singleton instance of grpc server stats.Handler
func GetDynamicOtelGrpcServerStatsHandler() stats.Handler {
return getDynamicServerHandler()
}
// GetDynamicOtelGrpcClientStatsHandler returns the singleton instance of grpc client stats.Handler
func GetDynamicOtelGrpcClientStatsHandler() stats.Handler {
return getDynamicClientHandler()
}
func NotifyTracerProviderUpdated() {
serverhandler := getDynamicServerHandler()
statsHandler := otelgrpc.NewServerHandler(
otelgrpc.WithInterceptorFilter(filterFunc),
otelgrpc.WithTracerProvider(otel.GetTracerProvider()),
)
serverhandler.setHandler(statsHandler)
clientHandler := getDynamicClientHandler()
statsHandler = otelgrpc.NewClientHandler(
otelgrpc.WithInterceptorFilter(filterFunc),
otelgrpc.WithTracerProvider(otel.GetTracerProvider()),
)
clientHandler.setHandler(statsHandler)
}
func (h *dynamicOtelGrpcStatsHandler) getHandler() stats.Handler {
return *h.handler.Load()
}
func (h *dynamicOtelGrpcStatsHandler) setHandler(handler stats.Handler) {
h.handler.Store(&handler)
}
// TagRPC can attach some information to the given context.
// The context used for the rest lifetime of the RPC will be derived from
// the returned context.
func (h *dynamicOtelGrpcStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
handler := h.getHandler()
if handler == nil {
return ctx
}
return handler.TagRPC(ctx, info)
}
// HandleRPC processes the RPC stats.
func (h *dynamicOtelGrpcStatsHandler) HandleRPC(ctx context.Context, stats stats.RPCStats) {
handler := h.getHandler()
if handler == nil {
return
}
handler.HandleRPC(ctx, stats)
}
// TagConn can attach some information to the given context.
// The returned context will be used for stats handling.
// For conn stats handling, the context used in HandleConn for this
// connection will be derived from the context returned.
// For RPC stats handling,
// - On server side, the context used in HandleRPC for all RPCs on this
//
// connection will be derived from the context returned.
// - On client side, the context is not derived from the context returned.
func (h *dynamicOtelGrpcStatsHandler) TagConn(ctx context.Context, tagInfo *stats.ConnTagInfo) context.Context {
handler := h.getHandler()
if handler == nil {
return ctx
}
return handler.TagConn(ctx, tagInfo)
}
// HandleConn processes the Conn stats.
func (h *dynamicOtelGrpcStatsHandler) HandleConn(ctx context.Context, stats stats.ConnStats) {
handler := h.getHandler()
if handler == nil {
return
}
handler.HandleConn(ctx, stats)
}