update k8s provider for sd

pull/232/head
peng.xu 2019-10-14 15:56:47 +08:00
parent b86859bb27
commit 8a432bc472
1 changed files with 63 additions and 45 deletions

View File

@ -1,7 +1,8 @@
import os import os
import sys import sys
if __name__ == '__main__': if __name__ == '__main__':
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) sys.path.append(os.path.dirname(os.path.dirname(
os.path.abspath(__file__))))
import re import re
import logging import logging
@ -9,6 +10,7 @@ import time
import copy import copy
import threading import threading
import queue import queue
import enum
from kubernetes import client, config, watch from kubernetes import client, config, watch
from utils import singleton from utils import singleton
@ -19,6 +21,11 @@ logger = logging.getLogger(__name__)
INCLUSTER_NAMESPACE_PATH = '/var/run/secrets/kubernetes.io/serviceaccount/namespace' INCLUSTER_NAMESPACE_PATH = '/var/run/secrets/kubernetes.io/serviceaccount/namespace'
class EventType(enum.Enum):
PodHeartBeat = 1
Watch = 2
class K8SMixin: class K8SMixin:
def __init__(self, namespace, in_cluster=False, **kwargs): def __init__(self, namespace, in_cluster=False, **kwargs):
self.namespace = namespace self.namespace = namespace
@ -29,13 +36,22 @@ class K8SMixin:
self.namespace = open(INCLUSTER_NAMESPACE_PATH).read() self.namespace = open(INCLUSTER_NAMESPACE_PATH).read()
if not self.v1: if not self.v1:
config.load_incluster_config() if self.in_cluster else config.load_kube_config() config.load_incluster_config(
) if self.in_cluster else config.load_kube_config()
self.v1 = client.CoreV1Api() self.v1 = client.CoreV1Api()
class K8SHeartbeatHandler(threading.Thread, K8SMixin): class K8SHeartbeatHandler(threading.Thread, K8SMixin):
def __init__(self, message_queue, namespace, label_selector, in_cluster=False, **kwargs): def __init__(self,
K8SMixin.__init__(self, namespace=namespace, in_cluster=in_cluster, **kwargs) message_queue,
namespace,
label_selector,
in_cluster=False,
**kwargs):
K8SMixin.__init__(self,
namespace=namespace,
in_cluster=in_cluster,
**kwargs)
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.queue = message_queue self.queue = message_queue
self.terminate = False self.terminate = False
@ -45,13 +61,13 @@ class K8SHeartbeatHandler(threading.Thread, K8SMixin):
def run(self): def run(self):
while not self.terminate: while not self.terminate:
try: try:
pods = self.v1.list_namespaced_pod(namespace=self.namespace, label_selector=self.label_selector) pods = self.v1.list_namespaced_pod(
event_message = { namespace=self.namespace,
'eType': 'PodHeartBeat', label_selector=self.label_selector)
'events': [] event_message = {'eType': EventType.PodHeartBeat, 'events': []}
}
for item in pods.items: for item in pods.items:
pod = self.v1.read_namespaced_pod(name=item.metadata.name, namespace=self.namespace) pod = self.v1.read_namespaced_pod(name=item.metadata.name,
namespace=self.namespace)
name = pod.metadata.name name = pod.metadata.name
ip = pod.status.pod_ip ip = pod.status.pod_ip
phase = pod.status.phase phase = pod.status.phase
@ -59,13 +75,11 @@ class K8SHeartbeatHandler(threading.Thread, K8SMixin):
message = pod.status.message message = pod.status.message
ready = True if phase == 'Running' else False ready = True if phase == 'Running' else False
pod_event = dict( pod_event = dict(pod=name,
pod=name, ip=ip,
ip=ip, ready=ready,
ready=ready, reason=reason,
reason=reason, message=message)
message=message
)
event_message['events'].append(pod_event) event_message['events'].append(pod_event)
@ -82,7 +96,10 @@ class K8SHeartbeatHandler(threading.Thread, K8SMixin):
class K8SEventListener(threading.Thread, K8SMixin): class K8SEventListener(threading.Thread, K8SMixin):
def __init__(self, message_queue, namespace, in_cluster=False, **kwargs): def __init__(self, message_queue, namespace, in_cluster=False, **kwargs):
K8SMixin.__init__(self, namespace=namespace, in_cluster=in_cluster, **kwargs) K8SMixin.__init__(self,
namespace=namespace,
in_cluster=in_cluster,
**kwargs)
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.queue = message_queue self.queue = message_queue
self.terminate = False self.terminate = False
@ -96,7 +113,8 @@ class K8SEventListener(threading.Thread, K8SMixin):
def run(self): def run(self):
resource_version = '' resource_version = ''
w = watch.Watch() w = watch.Watch()
for event in w.stream(self.v1.list_namespaced_event, namespace=self.namespace, for event in w.stream(self.v1.list_namespaced_event,
namespace=self.namespace,
field_selector='involvedObject.kind=Pod'): field_selector='involvedObject.kind=Pod'):
if self.terminate: if self.terminate:
break break
@ -104,7 +122,7 @@ class K8SEventListener(threading.Thread, K8SMixin):
resource_version = int(event['object'].metadata.resource_version) resource_version = int(event['object'].metadata.resource_version)
info = dict( info = dict(
eType='WatchEvent', eType=EventType.Watch,
pod=event['object'].involved_object.name, pod=event['object'].involved_object.name,
reason=event['object'].reason, reason=event['object'].reason,
message=event['object'].message, message=event['object'].message,
@ -137,7 +155,8 @@ class EventHandler(threading.Thread):
while try_cnt > 0: while try_cnt > 0:
try_cnt -= 1 try_cnt -= 1
try: try:
pod = self.mgr.v1.read_namespaced_pod(name=event['pod'], namespace=self.namespace) pod = self.mgr.v1.read_namespaced_pod(name=event['pod'],
namespace=self.namespace)
if not pod.status.pod_ip: if not pod.status.pod_ip:
time.sleep(0.5) time.sleep(0.5)
continue continue
@ -147,13 +166,15 @@ class EventHandler(threading.Thread):
if try_cnt <= 0 and not pod: if try_cnt <= 0 and not pod:
if not event['start_up']: if not event['start_up']:
logger.error('Pod {} is started but cannot read pod'.format(event['pod'])) logger.error('Pod {} is started but cannot read pod'.format(
event['pod']))
return return
elif try_cnt <= 0 and not pod.status.pod_ip: elif try_cnt <= 0 and not pod.status.pod_ip:
logger.warn('NoPodIPFoundError') logger.warn('NoPodIPFoundError')
return return
logger.info('Register POD {} with IP {}'.format(pod.metadata.name, pod.status.pod_ip)) logger.info('Register POD {} with IP {}'.format(
pod.metadata.name, pod.status.pod_ip))
self.mgr.add_pod(name=pod.metadata.name, ip=pod.status.pod_ip) self.mgr.add_pod(name=pod.metadata.name, ip=pod.status.pod_ip)
def on_pod_killing(self, event, **kwargs): def on_pod_killing(self, event, **kwargs):
@ -178,7 +199,7 @@ class EventHandler(threading.Thread):
logger.info(self.mgr.conn_mgr.conn_names) logger.info(self.mgr.conn_mgr.conn_names)
def handle_event(self, event): def handle_event(self, event):
if event['eType'] == 'PodHeartBeat': if event['eType'] == EventType.PodHeartBeat:
return self.on_pod_heartbeat(event) return self.on_pod_heartbeat(event)
if not event or (event['reason'] not in ('Started', 'Killing')): if not event or (event['reason'] not in ('Started', 'Killing')):
@ -204,7 +225,8 @@ class EventHandler(threading.Thread):
class KubernetesProviderSettings: class KubernetesProviderSettings:
def __init__(self, namespace, pod_patt, label_selector, in_cluster, poll_interval, **kwargs): def __init__(self, namespace, pod_patt, label_selector, in_cluster,
poll_interval, **kwargs):
self.namespace = namespace self.namespace = namespace
self.pod_patt = pod_patt self.pod_patt = pod_patt
self.label_selector = label_selector self.label_selector = label_selector
@ -231,16 +253,15 @@ class KubernetesProvider(object):
if not self.namespace: if not self.namespace:
self.namespace = open(incluster_namespace_path).read() self.namespace = open(incluster_namespace_path).read()
config.load_incluster_config() if self.in_cluster else config.load_kube_config() config.load_incluster_config(
) if self.in_cluster else config.load_kube_config()
self.v1 = client.CoreV1Api() self.v1 = client.CoreV1Api()
self.listener = K8SEventListener( self.listener = K8SEventListener(message_queue=self.queue,
message_queue=self.queue, namespace=self.namespace,
namespace=self.namespace, in_cluster=self.in_cluster,
in_cluster=self.in_cluster, v1=self.v1,
v1=self.v1, **kwargs)
**kwargs
)
self.pod_heartbeater = K8SHeartbeatHandler( self.pod_heartbeater = K8SHeartbeatHandler(
message_queue=self.queue, message_queue=self.queue,
@ -249,13 +270,13 @@ class KubernetesProvider(object):
in_cluster=self.in_cluster, in_cluster=self.in_cluster,
v1=self.v1, v1=self.v1,
poll_interval=self.poll_interval, poll_interval=self.poll_interval,
**kwargs **kwargs)
)
self.event_handler = EventHandler(mgr=self, self.event_handler = EventHandler(mgr=self,
message_queue=self.queue, message_queue=self.queue,
namespace=self.namespace, namespace=self.namespace,
pod_patt=self.pod_patt, **kwargs) pod_patt=self.pod_patt,
**kwargs)
def add_pod(self, name, ip): def add_pod(self, name, ip):
self.conn_mgr.register(name, 'tcp://{}:19530'.format(ip)) self.conn_mgr.register(name, 'tcp://{}:19530'.format(ip))
@ -292,17 +313,14 @@ if __name__ == '__main__':
connect_mgr = Connect() connect_mgr = Connect()
settings = KubernetesProviderSettings( settings = KubernetesProviderSettings(namespace='xp',
namespace='xp', pod_patt=".*-ro-servers-.*",
pod_patt=".*-ro-servers-.*", label_selector='tier=ro-servers',
label_selector='tier=ro-servers', poll_interval=5,
poll_interval=5, in_cluster=False)
in_cluster=False)
provider_class = ProviderManager.get_provider('Kubernetes') provider_class = ProviderManager.get_provider('Kubernetes')
t = provider_class(conn_mgr=connect_mgr, t = provider_class(conn_mgr=connect_mgr, settings=settings)
settings=settings
)
t.start() t.start()
cnt = 100 cnt = 100
while cnt > 0: while cnt > 0: