From 74429c902d0404483cababf902d480d07159e7f9 Mon Sep 17 00:00:00 2001 From: "peng.xu" Date: Fri, 25 Oct 2019 20:24:03 +0800 Subject: [PATCH] implement plugin framework for tracer --- shards/mishards/__init__.py | 15 +++++--- shards/mishards/settings.py | 1 + shards/requirements.txt | 1 + shards/{tracing => tracer}/__init__.py | 0 shards/tracer/factory.py | 48 +++++++++++++++++++++++++ shards/tracer/plugins/jaeger_factory.py | 33 +++++++++++++++++ shards/tracing/factory.py | 40 --------------------- 7 files changed, 93 insertions(+), 45 deletions(-) rename shards/{tracing => tracer}/__init__.py (100%) create mode 100644 shards/tracer/factory.py create mode 100644 shards/tracer/plugins/jaeger_factory.py delete mode 100644 shards/tracing/factory.py diff --git a/shards/mishards/__init__.py b/shards/mishards/__init__.py index 7db3d8cb5e..c5ecbe93fc 100644 --- a/shards/mishards/__init__.py +++ b/shards/mishards/__init__.py @@ -19,17 +19,22 @@ def create_app(testing_config=None): from sd import ProviderManager sd_proiver_class = ProviderManager.get_provider(settings.SD_PROVIDER) - discover = sd_proiver_class(settings=settings.SD_PROVIDER_SETTINGS, conn_mgr=connect_mgr) + discover = sd_proiver_class(settings=settings.SD_PROVIDER_SETTINGS, + conn_mgr=connect_mgr) - from tracing.factory import TracerFactory from mishards.grpc_utils import GrpcSpanDecorator - tracer = TracerFactory.new_tracer(config.TRACING_TYPE, settings.TracingConfig, - span_decorator=GrpcSpanDecorator()) + from tracer.factory import TracerFactory + tracer = TracerFactory(config.TRACING_PLUGIN_PATH).create(config.TRACING_TYPE, + settings.TracingConfig, + span_decorator=GrpcSpanDecorator()) from mishards.routings import RouterFactory router = RouterFactory.new_router(config.ROUTER_CLASS_NAME, connect_mgr) - grpc_server.init_app(conn_mgr=connect_mgr, tracer=tracer, router=router, discover=discover) + grpc_server.init_app(conn_mgr=connect_mgr, + tracer=tracer, + router=router, + discover=discover) from mishards import exception_handlers diff --git a/shards/mishards/settings.py b/shards/mishards/settings.py index 21a3bb7a65..08550374ad 100644 --- a/shards/mishards/settings.py +++ b/shards/mishards/settings.py @@ -74,6 +74,7 @@ class TracingConfig: class DefaultConfig: SQLALCHEMY_DATABASE_URI = env.str('SQLALCHEMY_DATABASE_URI') SQL_ECHO = env.bool('SQL_ECHO', False) + TRACING_PLUGIN_PATH = env.str('TRACING_PLUGIN_PATH', '') TRACING_TYPE = env.str('TRACING_TYPE', '') ROUTER_CLASS_NAME = env.str('ROUTER_CLASS_NAME', 'FileBasedHashRingRouter') diff --git a/shards/requirements.txt b/shards/requirements.txt index ae224e92ed..14bdde2a06 100644 --- a/shards/requirements.txt +++ b/shards/requirements.txt @@ -34,3 +34,4 @@ urllib3==1.25.3 jaeger-client>=3.4.0 grpcio-opentracing>=1.0 mock==2.0.0 +pluginbase==1.0.0 diff --git a/shards/tracing/__init__.py b/shards/tracer/__init__.py similarity index 100% rename from shards/tracing/__init__.py rename to shards/tracer/__init__.py diff --git a/shards/tracer/factory.py b/shards/tracer/factory.py new file mode 100644 index 0000000000..7ffed32bd0 --- /dev/null +++ b/shards/tracer/factory.py @@ -0,0 +1,48 @@ +import os +import logging +from functools import partial +from pluginbase import PluginBase + + +logger = logging.getLogger(__name__) + +here = os.path.abspath(os.path.dirname(__file__)) +get_path = partial(os.path.join, here) + +PLUGIN_PACKAGE_NAME = 'tracer.plugins' +plugin_base = PluginBase(package=PLUGIN_PACKAGE_NAME, + searchpath=[get_path('./plugins')]) + +class TracerFactory(object): + def __init__(self, searchpath=None): + self.plugin_package_name = PLUGIN_PACKAGE_NAME + self.tracer_map = {} + searchpath = searchpath if searchpath else [] + searchpath = [searchpath] if isinstance(searchpath, str) else searchpath + self.source = plugin_base.make_plugin_source( + searchpath=searchpath, identifier=self.__class__.__name__) + + for plugin_name in self.source.list_plugins(): + plugin = self.source.load_plugin(plugin_name) + plugin.setup(self) + + def on_plugin_setup(self, plugin_class): + name = getattr(plugin_class, 'name', plugin_class.__name__) + self.tracer_map[name.lower()] = plugin_class + + def plugin(self, name): + return self.tracer_map.get(name, None) + + def create(self, + tracer_type, + tracer_config, + span_decorator=None, + **kwargs): + if not tracer_type: + return Tracer() + plugin_class = self.plugin(tracer_type.lower()) + if not plugin_class: + raise RuntimeError('Tracer Plugin \'{}\' not installed!'.format(tracer_type)) + + tracer = plugin_class.create(tracer_config, span_decorator=span_decorator, **kwargs) + return tracer diff --git a/shards/tracer/plugins/jaeger_factory.py b/shards/tracer/plugins/jaeger_factory.py new file mode 100644 index 0000000000..ec71fe427f --- /dev/null +++ b/shards/tracer/plugins/jaeger_factory.py @@ -0,0 +1,33 @@ +import logging +from jaeger_client import Config +from grpc_opentracing.grpcext import intercept_server +from grpc_opentracing import open_tracing_server_interceptor +from tracer import Tracer + +logger = logging.getLogger(__name__) + +PLUGIN_NAME = __name__ + +class JaegerFactory: + name = 'jaeger' + @classmethod + def create(cls, tracer_config, span_decorator=None, **kwargs): + tracing_config = tracer_config.TRACING_CONFIG + service_name = tracer_config.TRACING_SERVICE_NAME + validate = tracer_config.TRACING_VALIDATE + config = Config(config=tracing_config, + service_name=service_name, + validate=validate) + + tracer = config.initialize_tracer() + tracer_interceptor = open_tracing_server_interceptor( + tracer, + log_payloads=tracer_config.TRACING_LOG_PAYLOAD, + span_decorator=span_decorator) + + return Tracer(tracer, tracer_interceptor, intercept_server) + + +def setup(app): + logger.debug('Plugin \'{}\' Installed In Package: {}'.format(PLUGIN_NAME, app.plugin_package_name)) + app.on_plugin_setup(JaegerFactory) diff --git a/shards/tracing/factory.py b/shards/tracing/factory.py deleted file mode 100644 index 14fcde2eb3..0000000000 --- a/shards/tracing/factory.py +++ /dev/null @@ -1,40 +0,0 @@ -import logging -from jaeger_client import Config -from grpc_opentracing.grpcext import intercept_server -from grpc_opentracing import open_tracing_server_interceptor - -from tracing import (Tracer, empty_server_interceptor_decorator) - -logger = logging.getLogger(__name__) - - -class TracerFactory: - @classmethod - def new_tracer(cls, - tracer_type, - tracer_config, - span_decorator=None, - **kwargs): - if not tracer_type: - return Tracer() - config = tracer_config.TRACING_CONFIG - service_name = tracer_config.TRACING_SERVICE_NAME - validate = tracer_config.TRACING_VALIDATE - # if not tracer_type: - # tracer_type = 'jaeger' - # config = tracer_config.DEFAULT_TRACING_CONFIG - - if tracer_type.lower() == 'jaeger': - config = Config(config=config, - service_name=service_name, - validate=validate) - - tracer = config.initialize_tracer() - tracer_interceptor = open_tracing_server_interceptor( - tracer, - log_payloads=tracer_config.TRACING_LOG_PAYLOAD, - span_decorator=span_decorator) - - return Tracer(tracer, tracer_interceptor, intercept_server) - - assert False, 'Unsupported tracer type: {}'.format(tracer_type)