mirror of https://github.com/milvus-io/milvus.git
implement plugin framework for tracer
parent
c73d1d8342
commit
74429c902d
|
@ -19,17 +19,22 @@ def create_app(testing_config=None):
|
||||||
from sd import ProviderManager
|
from sd import ProviderManager
|
||||||
|
|
||||||
sd_proiver_class = ProviderManager.get_provider(settings.SD_PROVIDER)
|
sd_proiver_class = ProviderManager.get_provider(settings.SD_PROVIDER)
|
||||||
discover = sd_proiver_class(settings=settings.SD_PROVIDER_SETTINGS, conn_mgr=connect_mgr)
|
discover = sd_proiver_class(settings=settings.SD_PROVIDER_SETTINGS,
|
||||||
|
conn_mgr=connect_mgr)
|
||||||
|
|
||||||
from tracing.factory import TracerFactory
|
|
||||||
from mishards.grpc_utils import GrpcSpanDecorator
|
from mishards.grpc_utils import GrpcSpanDecorator
|
||||||
tracer = TracerFactory.new_tracer(config.TRACING_TYPE, settings.TracingConfig,
|
from tracer.factory import TracerFactory
|
||||||
|
tracer = TracerFactory(config.TRACING_PLUGIN_PATH).create(config.TRACING_TYPE,
|
||||||
|
settings.TracingConfig,
|
||||||
span_decorator=GrpcSpanDecorator())
|
span_decorator=GrpcSpanDecorator())
|
||||||
|
|
||||||
from mishards.routings import RouterFactory
|
from mishards.routings import RouterFactory
|
||||||
router = RouterFactory.new_router(config.ROUTER_CLASS_NAME, connect_mgr)
|
router = RouterFactory.new_router(config.ROUTER_CLASS_NAME, connect_mgr)
|
||||||
|
|
||||||
grpc_server.init_app(conn_mgr=connect_mgr, tracer=tracer, router=router, discover=discover)
|
grpc_server.init_app(conn_mgr=connect_mgr,
|
||||||
|
tracer=tracer,
|
||||||
|
router=router,
|
||||||
|
discover=discover)
|
||||||
|
|
||||||
from mishards import exception_handlers
|
from mishards import exception_handlers
|
||||||
|
|
||||||
|
|
|
@ -74,6 +74,7 @@ class TracingConfig:
|
||||||
class DefaultConfig:
|
class DefaultConfig:
|
||||||
SQLALCHEMY_DATABASE_URI = env.str('SQLALCHEMY_DATABASE_URI')
|
SQLALCHEMY_DATABASE_URI = env.str('SQLALCHEMY_DATABASE_URI')
|
||||||
SQL_ECHO = env.bool('SQL_ECHO', False)
|
SQL_ECHO = env.bool('SQL_ECHO', False)
|
||||||
|
TRACING_PLUGIN_PATH = env.str('TRACING_PLUGIN_PATH', '')
|
||||||
TRACING_TYPE = env.str('TRACING_TYPE', '')
|
TRACING_TYPE = env.str('TRACING_TYPE', '')
|
||||||
ROUTER_CLASS_NAME = env.str('ROUTER_CLASS_NAME', 'FileBasedHashRingRouter')
|
ROUTER_CLASS_NAME = env.str('ROUTER_CLASS_NAME', 'FileBasedHashRingRouter')
|
||||||
|
|
||||||
|
|
|
@ -34,3 +34,4 @@ urllib3==1.25.3
|
||||||
jaeger-client>=3.4.0
|
jaeger-client>=3.4.0
|
||||||
grpcio-opentracing>=1.0
|
grpcio-opentracing>=1.0
|
||||||
mock==2.0.0
|
mock==2.0.0
|
||||||
|
pluginbase==1.0.0
|
||||||
|
|
|
@ -0,0 +1,48 @@
|
||||||
|
import os
|
||||||
|
import logging
|
||||||
|
from functools import partial
|
||||||
|
from pluginbase import PluginBase
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
here = os.path.abspath(os.path.dirname(__file__))
|
||||||
|
get_path = partial(os.path.join, here)
|
||||||
|
|
||||||
|
PLUGIN_PACKAGE_NAME = 'tracer.plugins'
|
||||||
|
plugin_base = PluginBase(package=PLUGIN_PACKAGE_NAME,
|
||||||
|
searchpath=[get_path('./plugins')])
|
||||||
|
|
||||||
|
class TracerFactory(object):
|
||||||
|
def __init__(self, searchpath=None):
|
||||||
|
self.plugin_package_name = PLUGIN_PACKAGE_NAME
|
||||||
|
self.tracer_map = {}
|
||||||
|
searchpath = searchpath if searchpath else []
|
||||||
|
searchpath = [searchpath] if isinstance(searchpath, str) else searchpath
|
||||||
|
self.source = plugin_base.make_plugin_source(
|
||||||
|
searchpath=searchpath, identifier=self.__class__.__name__)
|
||||||
|
|
||||||
|
for plugin_name in self.source.list_plugins():
|
||||||
|
plugin = self.source.load_plugin(plugin_name)
|
||||||
|
plugin.setup(self)
|
||||||
|
|
||||||
|
def on_plugin_setup(self, plugin_class):
|
||||||
|
name = getattr(plugin_class, 'name', plugin_class.__name__)
|
||||||
|
self.tracer_map[name.lower()] = plugin_class
|
||||||
|
|
||||||
|
def plugin(self, name):
|
||||||
|
return self.tracer_map.get(name, None)
|
||||||
|
|
||||||
|
def create(self,
|
||||||
|
tracer_type,
|
||||||
|
tracer_config,
|
||||||
|
span_decorator=None,
|
||||||
|
**kwargs):
|
||||||
|
if not tracer_type:
|
||||||
|
return Tracer()
|
||||||
|
plugin_class = self.plugin(tracer_type.lower())
|
||||||
|
if not plugin_class:
|
||||||
|
raise RuntimeError('Tracer Plugin \'{}\' not installed!'.format(tracer_type))
|
||||||
|
|
||||||
|
tracer = plugin_class.create(tracer_config, span_decorator=span_decorator, **kwargs)
|
||||||
|
return tracer
|
|
@ -0,0 +1,33 @@
|
||||||
|
import logging
|
||||||
|
from jaeger_client import Config
|
||||||
|
from grpc_opentracing.grpcext import intercept_server
|
||||||
|
from grpc_opentracing import open_tracing_server_interceptor
|
||||||
|
from tracer import Tracer
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
PLUGIN_NAME = __name__
|
||||||
|
|
||||||
|
class JaegerFactory:
|
||||||
|
name = 'jaeger'
|
||||||
|
@classmethod
|
||||||
|
def create(cls, tracer_config, span_decorator=None, **kwargs):
|
||||||
|
tracing_config = tracer_config.TRACING_CONFIG
|
||||||
|
service_name = tracer_config.TRACING_SERVICE_NAME
|
||||||
|
validate = tracer_config.TRACING_VALIDATE
|
||||||
|
config = Config(config=tracing_config,
|
||||||
|
service_name=service_name,
|
||||||
|
validate=validate)
|
||||||
|
|
||||||
|
tracer = config.initialize_tracer()
|
||||||
|
tracer_interceptor = open_tracing_server_interceptor(
|
||||||
|
tracer,
|
||||||
|
log_payloads=tracer_config.TRACING_LOG_PAYLOAD,
|
||||||
|
span_decorator=span_decorator)
|
||||||
|
|
||||||
|
return Tracer(tracer, tracer_interceptor, intercept_server)
|
||||||
|
|
||||||
|
|
||||||
|
def setup(app):
|
||||||
|
logger.debug('Plugin \'{}\' Installed In Package: {}'.format(PLUGIN_NAME, app.plugin_package_name))
|
||||||
|
app.on_plugin_setup(JaegerFactory)
|
|
@ -1,40 +0,0 @@
|
||||||
import logging
|
|
||||||
from jaeger_client import Config
|
|
||||||
from grpc_opentracing.grpcext import intercept_server
|
|
||||||
from grpc_opentracing import open_tracing_server_interceptor
|
|
||||||
|
|
||||||
from tracing import (Tracer, empty_server_interceptor_decorator)
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class TracerFactory:
|
|
||||||
@classmethod
|
|
||||||
def new_tracer(cls,
|
|
||||||
tracer_type,
|
|
||||||
tracer_config,
|
|
||||||
span_decorator=None,
|
|
||||||
**kwargs):
|
|
||||||
if not tracer_type:
|
|
||||||
return Tracer()
|
|
||||||
config = tracer_config.TRACING_CONFIG
|
|
||||||
service_name = tracer_config.TRACING_SERVICE_NAME
|
|
||||||
validate = tracer_config.TRACING_VALIDATE
|
|
||||||
# if not tracer_type:
|
|
||||||
# tracer_type = 'jaeger'
|
|
||||||
# config = tracer_config.DEFAULT_TRACING_CONFIG
|
|
||||||
|
|
||||||
if tracer_type.lower() == 'jaeger':
|
|
||||||
config = Config(config=config,
|
|
||||||
service_name=service_name,
|
|
||||||
validate=validate)
|
|
||||||
|
|
||||||
tracer = config.initialize_tracer()
|
|
||||||
tracer_interceptor = open_tracing_server_interceptor(
|
|
||||||
tracer,
|
|
||||||
log_payloads=tracer_config.TRACING_LOG_PAYLOAD,
|
|
||||||
span_decorator=span_decorator)
|
|
||||||
|
|
||||||
return Tracer(tracer, tracer_interceptor, intercept_server)
|
|
||||||
|
|
||||||
assert False, 'Unsupported tracer type: {}'.format(tracer_type)
|
|
Loading…
Reference in New Issue