Refactoring of the python multiprocessing code to use queues load balancing based on apply_async();

Use the returned result by apply_async() to fetch compile_worker() results and get rid of python queues;
Optimize the threads handling code
Reuse compile threads via self.mp_pool
pull/395/head
Mihail Stoyanov 2014-07-10 15:33:04 +03:00
parent 82e9c166f3
commit fab45821a7
1 changed files with 42 additions and 52 deletions

View File

@ -28,6 +28,7 @@ from workspace_tools.settings import BUILD_OPTIONS, MBED_ORG_USER
from multiprocessing import Pool, Manager, cpu_count
from time import sleep
from pprint import pprint
import workspace_tools.hooks as hooks
import re
@ -66,19 +67,16 @@ def print_notify_verbose(event):
elif event['type'] == 'progress':
print_notify(event) # standard handle
def compile_worker(jobs):
for job in jobs:
_, stderr, rc = run_cmd(job['command'], job['work_dir'])
def compile_worker(job):
_, stderr, rc = run_cmd(job['command'], job['work_dir'])
job['queue'].put({
'code': rc,
'output': stderr,
'command': job['command'],
'source': job['source'],
'object': job['object']
})
return True
return {
'code': rc,
'output': stderr,
'command': job['command'],
'source': job['source'],
'object': job['object']
}
class Resources:
def __init__(self, base_path=None):
@ -237,6 +235,12 @@ class mbedToolchain:
self.CHROOT = None
self.mp_pool = None
def __exit__():
if self.mp_pool is not None:
self.mp_pool.terminate()
def goanna_parse_line(self, line):
if "analyze" in self.options:
return self.GOANNA_DIAGNOSTIC_PATTERN.match(line)
@ -500,41 +504,27 @@ class mbedToolchain:
return objects
def compile_queue(self, queue, objects):
manager = Manager()
q = manager.Queue()
groups = []
groups_count = int(self.jobs if self.jobs else cpu_count())
for i in range(groups_count):
groups.append([])
jobs_count = int(self.jobs if self.jobs else cpu_count())
if self.mp_pool is None or self.mp_pool._state == 2: # TERMINATE
self.mp_pool = Pool(processes=jobs_count)
results = []
for i in range(len(queue)):
queue[i]['queue'] = q
g = i % groups_count
groups[g].append(queue[i])
results.append(self.mp_pool.apply_async(compile_worker, [queue[i]]))
p = Pool(processes=groups_count)
m = p.map_async(compile_worker, groups)
done = False
itr = 0
while True:
if itr > 3000:
raise ToolException("Compile did not finish in less than 5 minutes")
itr = itr + 1
itr += 1
if itr > 6000:
self.mp_pool.terminate()
raise ToolException("Compile did not finish in 5 minutes")
results = []
pending = 0
for r in results:
if r._ready is True:
result = r.get()
results.remove(r)
try:
while not q.empty():
results.append(q.get())
except EOFError:
p.terminate()
raise ToolException("Failed to spawn child process")
if len(results):
for result in results:
objects.append(result['object'])
self.compiled += 1
self.progress("compile", result['source'], build_update=True)
@ -546,20 +536,20 @@ class mbedToolchain:
result['command']
])
except ToolException, err:
p.terminate()
self.mp_pool.terminate()
raise ToolException(err)
else:
pending += 1
if pending > jobs_count:
break
if done:
if len(results) == 0:
break
if m.ready():
done = True
#let it run one more time to gather any results left in the queue
continue #skip the sleep
sleep(0.01)
sleep(0.1)
p.terminate()
results = None
return objects